Merge pull request #365 from anmol098/fix/utilization_of_linguist_instead_of_own_colors_definition

Downloads manager created for network interactions processing
This commit is contained in:
Alexander Sergeev
2023-02-16 22:27:35 +01:00
committed by GitHub
6 changed files with 487 additions and 2809 deletions

View File

@@ -7,3 +7,5 @@ altair==4.1.0
altair-saver==0.5.0
pytz==2021.1
humanize==3.3.0
httpx==0.23.3
PyYAML==6.0

File diff suppressed because it is too large Load Diff

202
sources/download_manager.py Normal file
View File

@@ -0,0 +1,202 @@
from hashlib import md5
from json import dumps
from string import Template
from typing import Awaitable, Dict, Callable, Optional
from httpx import AsyncClient
from yaml import safe_load
from github import AuthenticatedUser
GITHUB_API_QUERIES = {
"repositories_contributed_to": """
{
user(login: "$username") {
repositoriesContributedTo(last: 100, includeUserRepositories: true) {
nodes {
isFork
name
owner {
login
}
}
}
}
}""",
"repository_committed_dates": """
{
repository(owner: "$owner", name: "$name") {
defaultBranchRef {
target {
... on Commit {
history(first: 100, author: { id: "$id" }) {
edges {
node {
committedDate
}
}
}
}
}
}
}
}""",
"user_repository_list": """
{
user(login: "$username") {
repositories(orderBy: {field: CREATED_AT, direction: ASC}, last: 100, affiliations: [OWNER, COLLABORATOR], isFork: false) {
edges {
node {
primaryLanguage {
name
}
name
owner {
login
}
}
}
}
}
}
""",
"repository_commit_list": """
{
repository(owner: "$owner", name: "$name") {
refs(refPrefix: "refs/heads/", orderBy: {direction: DESC, field: TAG_COMMIT_DATE}, first: 100) {
edges {
node {
... on Ref {
target {
... on Commit {
history(first: 100, author: { id: "$id" }) {
edges {
node {
... on Commit {
additions
deletions
committedDate
}
}
}
}
}
}
}
}
}
}
}
}
"""
}
async def init_download_manager(waka_key: str, github_key: str, user: AuthenticatedUser):
"""
Initialize download manager:
- Setup headers for GitHub GraphQL requests.
- Launch static queries in background.
:param waka_key: WakaTime API token.
:param github_key: GitHub API token.
:param user: GitHub current user info.
"""
await DownloadManager.load_remote_resources({
"linguist": "https://cdn.jsdelivr.net/gh/github/linguist@master/lib/linguist/languages.yml",
"waka_latest": f"https://wakatime.com/api/v1/users/current/stats/last_7_days?api_key={waka_key}",
"waka_all": f"https://wakatime.com/api/v1/users/current/all_time_since_today?api_key={waka_key}",
"github_stats": f"https://github-contributions.vercel.app/api/v1/{user.login}"
}, {
"Authorization": f"Bearer {github_key}"
})
class DownloadManager:
"""
Class for handling and caching all kinds of requests.
There considered to be two types of queries:
- Static queries: queries that don't require many arguments that should be executed once
Example: queries to WakaTime API or to GitHub linguist
- Dynamic queries: queries that require many arguments and should be executed multiple times
Example: GraphQL queries to GitHub API
DownloadManager launches all static queries asynchronously upon initialization and caches their results.
It also executes dynamic queries upon request and caches result.
"""
_client = AsyncClient(timeout=60.0)
_REMOTE_RESOURCES_CACHE = dict()
@staticmethod
async def load_remote_resources(resources: Dict[str, str], github_headers: Dict[str, str]):
"""
Prepare DownloadManager to launch GitHub API queries and launch all static queries.
:param resources: Dictionary of static queries, "IDENTIFIER": "URL".
:param github_headers: Dictionary of headers for GitHub API queries.
"""
for resource, url in resources.items():
DownloadManager._REMOTE_RESOURCES_CACHE[resource] = DownloadManager._client.get(url)
DownloadManager._client.headers = github_headers
@staticmethod
async def _get_remote_resource(resource: str, convertor: Optional[Callable[[bytes], Dict]]) -> Dict:
"""
Receive execution result of static query, wait for it if necessary.
If the query wasn't cached previously, cache it.
NB! Caching is done before response parsing - to throw exception on accessing cached erroneous response.
:param resource: Static query identifier.
:param convertor: Optional function to convert `response.contents` to dict.
By default `response.json()` is used.
:return: Response dictionary.
"""
if isinstance(DownloadManager._REMOTE_RESOURCES_CACHE[resource], Awaitable):
res = await DownloadManager._REMOTE_RESOURCES_CACHE[resource]
DownloadManager._REMOTE_RESOURCES_CACHE[resource] = res
if res.status_code == 200:
if convertor is None:
return res.json()
else:
return convertor(res.content)
else:
raise Exception(f"Query '{res.url}' failed to run by returning code of {res.status_code}: {res.json()}")
@staticmethod
async def get_remote_json(resource: str) -> Dict:
"""
Shortcut for `_get_remote_resource` to return JSON response data.
:param resource: Static query identifier.
:return: Response JSON dictionary.
"""
return await DownloadManager._get_remote_resource(resource, None)
@staticmethod
async def get_remote_yaml(resource: str) -> Dict:
"""
Shortcut for `_get_remote_resource` to return YAML response data.
:param resource: Static query identifier.
:return: Response YAML dictionary.
"""
return await DownloadManager._get_remote_resource(resource, safe_load)
@staticmethod
async def get_remote_graphql(query: str, **kwargs) -> Dict:
"""
Execute GitHub GraphQL API query.
The queries are defined in `GITHUB_API_QUERIES`, all parameters should be passed as kwargs.
If the query wasn't cached previously, cache it. Cache query by its identifier + parameters hash.
NB! Caching is done before response parsing - to throw exception on accessing cached erroneous response.
Parse and return response as JSON.
:param query: Dynamic query identifier.
:param kwargs: Parameters for substitution of variables in dynamic query.
:return: Response JSON dictionary.
"""
key = f"{query}_{md5(dumps(kwargs, sort_keys=True).encode('utf-8')).digest()}"
if key not in DownloadManager._REMOTE_RESOURCES_CACHE:
res = await DownloadManager._client.post("https://api.github.com/graphql", json={
"query": Template(GITHUB_API_QUERIES[query]).substitute(kwargs)
})
DownloadManager._REMOTE_RESOURCES_CACHE[key] = res
else:
res = DownloadManager._REMOTE_RESOURCES_CACHE[key]
if res.status_code == 200:
return res.json()
else:
raise Exception(f"Query '{query}' failed to run by returning code of {res.status_code}: {res.json()}")

View File

@@ -1,53 +1,37 @@
import re
import os
import base64
import requests
from github import Github, InputGitAuthor
import datetime
from string import Template
import matplotlib.pyplot as plt
from io import StringIO, BytesIO
from dotenv import load_dotenv
import time
from asyncio import sleep
from github import Github, InputGitAuthor, AuthenticatedUser
import datetime
from download_manager import DownloadManager
from make_bar_graph import BarGraph
class LinesOfCode:
def __init__(self, id, username, ghtoken, repositoryData, ignored_repos):
self.id = id
self.username = username
def __init__(self, user: AuthenticatedUser, ghtoken, repositoryData, ignored_repos):
self.g = Github(ghtoken)
self.headers = {"Authorization": "Bearer " + ghtoken}
self.user = user
self.repositoryData = repositoryData
self.ignored_repos = ignored_repos
def calculateLoc(self):
async def calculateLoc(self):
result = self.repositoryData
yearly_data = {}
for repo in result['data']['user']['repositories']['edges']:
total = len(result['data']['user']['repositories']['edges'])
for ind, repo in enumerate(result['data']['user']['repositories']['edges']):
if repo['node']['name'] not in self.ignored_repos:
self.getCommitStat(repo['node'], yearly_data)
time.sleep(0.7)
print(f"{ind}/{total}", "Retrieving repo:", repo['node']["owner"]["login"], repo['node']['name'])
await self.getCommitStat(repo['node'], yearly_data)
await sleep(0.7)
return yearly_data
def plotLoc(self, yearly_data):
async def plotLoc(self, yearly_data):
graph = BarGraph(yearly_data)
graph.build_graph()
await graph.build_graph()
self.pushChart()
def run_query_v3(self, endPoint):
# print(endPoint)
request = requests.get(endPoint, headers=self.headers)
if request.status_code == 401:
raise Exception("Invalid token {}.".format(request.status_code))
elif request.status_code == 204:
return []
else:
return request.json()
def getQuarter(self, timeStamp):
month = datetime.datetime.fromisoformat(timeStamp).month
if month >= 1 and month <= 3:
@@ -59,45 +43,30 @@ class LinesOfCode:
elif month >= 10 and month <= 12:
return 4
def getCommitStat(self, repoDetails, yearly_data):
commitsURL = 'https://api.github.com/repos/' + repoDetails['nameWithOwner'] + '/commits'
filteredCommitsEndPoint = commitsURL + '?author=' + self.username
filteredCommitsResult = self.run_query_v3(filteredCommitsEndPoint)
# This ignores the error message you get when you try to list commits for an empty repository
if not type(filteredCommitsResult) == list:
async def getCommitStat(self, repoDetails, yearly_data):
commit_data = await DownloadManager.get_remote_graphql("repository_commit_list", owner=repoDetails["owner"]["login"], name=repoDetails['name'], id=self.user.node_id)
if commit_data["data"]["repository"] is None:
print("\tSkipping:", repoDetails['name'])
return
this_year = datetime.datetime.utcnow().year
for i in range(len(filteredCommitsResult)):
iso_date = filteredCommitsResult[i]["commit"]["author"]["date"]
date = re.search(r'\d+-\d+-\d+', iso_date).group(0)
for commit in [commit["node"] for branch in commit_data["data"]["repository"]["refs"]["edges"] for commit in branch["node"]["target"]["history"]["edges"]]:
date = re.search(r'\d+-\d+-\d+', commit["committedDate"]).group(0)
curr_year = datetime.datetime.fromisoformat(date).year
# if curr_year != this_year:
individualCommitEndPoint = commitsURL + '/' + filteredCommitsResult[i]["sha"]
individualCommitResult = self.run_query_v3(individualCommitEndPoint)
quarter = self.getQuarter(date)
if repoDetails['primaryLanguage'] is not None:
if repoDetails['primaryLanguage'] is not None:
if curr_year not in yearly_data:
yearly_data[curr_year] = {}
if quarter not in yearly_data[curr_year]:
yearly_data[curr_year][quarter] = {}
if repoDetails['primaryLanguage']['name'] not in yearly_data[curr_year][quarter]:
yearly_data[curr_year][quarter][repoDetails['primaryLanguage']['name']] = 0
yearly_data[curr_year][quarter][repoDetails['primaryLanguage']['name']] += (individualCommitResult["stats"]["additions"] - individualCommitResult["stats"]['deletions'])
yearly_data[curr_year][quarter][repoDetails['primaryLanguage']['name']] += (commit["additions"] - commit["deletions"])
# to find total
# if 'total' not in yearly_data[curr_year]:
# yearly_data[curr_year]['total']={}
# if repoDetails['primaryLanguage']['name'] not in yearly_data[curr_year]['total']:
# yearly_data[curr_year]['total'][repoDetails['primaryLanguage']['name']]=0
# yearly_data[curr_year]['total'][repoDetails['primaryLanguage']['name']]+=(result[i][1]+result[i][2])
def pushChart(self):
repo = self.g.get_repo(f"{self.username}/{self.username}")
repo = self.g.get_repo(f"{self.user.login}/{self.user.login}")
committer = InputGitAuthor('readme-bot', '41898282+github-actions[bot]@users.noreply.github.com')
with open('bar_graph.png', 'rb') as input_file:
data = input_file.read()

View File

@@ -4,20 +4,19 @@ Readme Development Metrics With waka time progress
import re
import os
import base64
from asyncio import run
from typing import Dict
from pytz import timezone
import pytz
import requests
from github import Github, GithubException, InputGitAuthor
from github import Github, InputGitAuthor, AuthenticatedUser
import datetime
from string import Template
from download_manager import init_download_manager, DownloadManager
from loc import LinesOfCode
import time
import traceback
import humanize
from urllib.parse import quote
import json
import sys
from datetime import date
import math
from dotenv import load_dotenv
@@ -54,107 +53,11 @@ commit_email = os.getenv('INPUT_COMMIT_EMAIL')
show_total_code_time = os.getenv('INPUT_SHOW_TOTAL_CODE_TIME')
symbol_version = os.getenv('INPUT_SYMBOL_VERSION').strip()
show_waka_stats = 'y'
# The GraphQL query to get commit data.
userInfoQuery = """
{
viewer {
login
email
id
}
}
"""
createContributedRepoQuery = Template("""query {
user(login: "$username") {
repositoriesContributedTo(last: 100, includeUserRepositories: true) {
nodes {
isFork
name
owner {
login
}
}
}
}
}
""")
createCommittedDateQuery = Template("""
query {
repository(owner: "$owner", name: "$name") {
defaultBranchRef {
target {
... on Commit {
history(first: 100, author: { id: "$id" }) {
edges {
node {
committedDate
}
}
}
}
}
}
}
}
""")
get_loc_url = Template("""/repos/$owner/$repo/stats/code_frequency""")
get_profile_view = Template("""/repos/$owner/$repo/traffic/views?per=week""")
get_profile_traffic = Template("""/repos/$owner/$repo/traffic/popular/referrers""")
truthy = ['true', '1', 't', 'y', 'yes']
def run_v3_api(query):
request = requests.get('https://api.github.com' + query, headers=headers)
if request.status_code == 200:
return request.json()
else:
raise Exception(
"Query failed to run by returning code of {}. {},... {}".format(request.status_code, query,
str(request.json())))
repositoryListQuery = Template("""
{
user(login: "$username") {
repositories(orderBy: {field: CREATED_AT, direction: ASC}, last: 100, affiliations: [OWNER, COLLABORATOR, ORGANIZATION_MEMBER], isFork: false) {
totalCount
edges {
node {
object(expression:"master") {
... on Commit {
history (author: { id: "$id" }){
totalCount
}
}
}
primaryLanguage {
color
name
id
}
stargazers {
totalCount
}
collaborators {
totalCount
}
createdAt
name
owner {
id
login
}
nameWithOwner
}
}
}
location
createdAt
name
}
}
""")
translate: Dict[str, str]
user: AuthenticatedUser
def millify(n):
@@ -168,14 +71,6 @@ def millify(n):
return '{:.0f}{}'.format(n / 10 ** (3 * millidx), millnames[millidx])
def run_query(query):
request = requests.post('https://api.github.com/graphql', json={'query': query}, headers=headers)
if request.status_code == 200:
return request.json()
else:
raise Exception("Query failed to run by returning code of {}. {}".format(request.status_code, query))
def make_graph(percent: float):
'''Make progress graph from API graph'''
if (symbol_version == '1'): # version 1
@@ -219,14 +114,10 @@ def make_commit_list(data: list):
return '\n'.join(data_list)
def generate_commit_list(tz):
async def generate_commit_list(tz):
string = ''
result = run_query(userInfoQuery) # Execute the query
username = result["data"]["viewer"]["login"]
id = result["data"]["viewer"]["id"]
# print("user {}".format(username))
result = run_query(createContributedRepoQuery.substitute(username=username))
result = await DownloadManager.get_remote_graphql("repositories_contributed_to", username=user.login)
nodes = result["data"]["user"]["repositoriesContributedTo"]["nodes"]
repos = [d for d in nodes if d['isFork'] is False]
@@ -244,42 +135,35 @@ def generate_commit_list(tz):
Sunday = 0
for repository in repos:
result = run_query(
createCommittedDateQuery.substitute(owner=repository["owner"]["login"], name=repository["name"], id=id))
try:
committed_dates = result["data"]["repository"]["defaultBranchRef"]["target"]["history"]["edges"]
for committedDate in committed_dates:
date = datetime.datetime.strptime(committedDate["node"]["committedDate"],
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.utc).astimezone(
timezone(tz))
hour = date.hour
weekday = date.strftime('%A')
if 6 <= hour < 12:
morning += 1
if 12 <= hour < 18:
daytime += 1
if 18 <= hour < 24:
evening += 1
if 0 <= hour < 6:
night += 1
result = await DownloadManager.get_remote_graphql("repository_committed_dates", owner=repository["owner"]["login"], name=repository["name"], id=user.node_id)
committed_dates = result["data"]["repository"]["defaultBranchRef"]["target"]["history"]["edges"]
for committedDate in committed_dates:
date = datetime.datetime.strptime(committedDate["node"]["committedDate"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.utc).astimezone(timezone(tz))
hour = date.hour
weekday = date.strftime('%A')
if 6 <= hour < 12:
morning += 1
if 12 <= hour < 18:
daytime += 1
if 18 <= hour < 24:
evening += 1
if 0 <= hour < 6:
night += 1
if weekday == "Monday":
Monday += 1
if weekday == "Tuesday":
Tuesday += 1
if weekday == "Wednesday":
Wednesday += 1
if weekday == "Thursday":
Thursday += 1
if weekday == "Friday":
Friday += 1
if weekday == "Saturday":
Saturday += 1
if weekday == "Sunday":
Sunday += 1
except Exception as ex:
if str(ex) != "'NoneType' object is not subscriptable":
print("Exception occurred " + str(ex))
if weekday == "Monday":
Monday += 1
if weekday == "Tuesday":
Tuesday += 1
if weekday == "Wednesday":
Wednesday += 1
if weekday == "Thursday":
Thursday += 1
if weekday == "Friday":
Friday += 1
if weekday == "Saturday":
Saturday += 1
if weekday == "Sunday":
Sunday += 1
sumAll = morning + daytime + evening + night
sum_week = Sunday + Monday + Tuesday + Friday + Saturday + Wednesday + Thursday
@@ -324,60 +208,54 @@ def generate_commit_list(tz):
return string
def get_waka_time_stats():
async def get_waka_time_stats():
stats = ''
request = requests.get(
f"https://wakatime.com/api/v1/users/current/stats/last_7_days?api_key={waka_key}")
no_activity = translate["No Activity Tracked This Week"]
if request.status_code == 401 or request.status_code != 200:
print("Error With WAKA time API returned " + str(request.status_code) + " Response " + str(request.json()))
else:
data = request.json()
if showCommit.lower() in truthy:
stats = stats + generate_commit_list(tz=data['data']['timezone']) + '\n\n'
data = await DownloadManager.get_remote_json("waka_latest")
if showCommit.lower() in truthy:
stats = stats + await generate_commit_list(data['data']['timezone']) + '\n\n'
if showTimeZone.lower() in truthy or showLanguage.lower() in truthy or showEditors.lower() in truthy or \
showProjects.lower() in truthy or showOs.lower() in truthy:
stats += '📊 **' + translate['This Week I Spend My Time On'] + '** \n\n'
stats += '```text\n'
if showTimeZone.lower() in truthy or showLanguage.lower() in truthy or showEditors.lower() in truthy or showProjects.lower() in truthy or showOs.lower() in truthy:
stats += '📊 **' + translate['This Week I Spend My Time On'] + '** \n\n'
stats += '```text\n'
if showTimeZone.lower() in truthy:
tzone = data['data']['timezone']
stats = stats + '⌚︎ ' + translate['Timezone'] + ': ' + tzone + '\n\n'
if showTimeZone.lower() in truthy:
tzone = data['data']['timezone']
stats = stats + '⌚︎ ' + translate['Timezone'] + ': ' + tzone + '\n\n'
if showLanguage.lower() in truthy:
if len(data['data']['languages']) == 0:
lang_list = no_activity
else:
lang_list = make_list(data['data']['languages'])
stats = stats + '💬 ' + translate['Languages'] + ': \n' + lang_list + '\n\n'
if showLanguage.lower() in truthy:
if len(data['data']['languages']) == 0:
lang_list = no_activity
else:
lang_list = make_list(data['data']['languages'])
stats = stats + '💬 ' + translate['Languages'] + ': \n' + lang_list + '\n\n'
if showEditors.lower() in truthy:
if len(data['data']['editors']) == 0:
edit_list = no_activity
else:
edit_list = make_list(data['data']['editors'])
stats = stats + '🔥 ' + translate['Editors'] + ': \n' + edit_list + '\n\n'
if showEditors.lower() in truthy:
if len(data['data']['editors']) == 0:
edit_list = no_activity
else:
edit_list = make_list(data['data']['editors'])
stats = stats + '🔥 ' + translate['Editors'] + ': \n' + edit_list + '\n\n'
if showProjects.lower() in truthy:
if len(data['data']['projects']) == 0:
project_list = no_activity
else:
# Re-order the project list by percentage
data['data']['projects'] = sorted(data['data']['projects'], key=lambda x: x["percent"],
if showProjects.lower() in truthy:
if len(data['data']['projects']) == 0:
project_list = no_activity
else:
# Re-order the project list by percentage
data['data']['projects'] = sorted(data['data']['projects'], key=lambda x: x["percent"],
reverse=True)
project_list = make_list(data['data']['projects'])
stats = stats + '🐱‍💻 ' + translate['Projects'] + ': \n' + project_list + '\n\n'
project_list = make_list(data['data']['projects'])
stats = stats + '🐱‍💻 ' + translate['Projects'] + ': \n' + project_list + '\n\n'
if showOs.lower() in truthy:
if len(data['data']['operating_systems']) == 0:
os_list = no_activity
else:
os_list = make_list(data['data']['operating_systems'])
stats = stats + '💻 ' + translate['operating system'] + ': \n' + os_list + '\n\n'
if showOs.lower() in truthy:
if len(data['data']['operating_systems']) == 0:
os_list = no_activity
else:
os_list = make_list(data['data']['operating_systems'])
stats = stats + '💻 ' + translate['operating system'] + ': \n' + os_list + '\n\n'
stats += '```\n\n'
stats += '```\n\n'
return stats
@@ -389,14 +267,12 @@ def generate_language_per_repo(result):
if repo['node']['primaryLanguage'] is None:
continue
language = repo['node']['primaryLanguage']['name']
color_code = repo['node']['primaryLanguage']['color']
total += 1
if language not in language_count.keys():
language_count[language] = {}
language_count[language]['count'] = 1
else:
language_count[language]['count'] = language_count[language]['count'] + 1
language_count[language]['color'] = color_code
data = []
sorted_labels = list(language_count.keys())
sorted_labels.sort(key=lambda x: language_count[x]['count'], reverse=True)
@@ -416,44 +292,43 @@ def generate_language_per_repo(result):
return '**' + title + '** \n\n' + '```text\n' + make_list(data) + '\n\n```\n'
def get_yearly_data():
repository_list = run_query(repositoryListQuery.substitute(username=username, id=id))
loc = LinesOfCode(id, username, ghtoken, repository_list, ignored_repos_name)
yearly_data = loc.calculateLoc()
async def get_yearly_data():
repository_list = await DownloadManager.get_remote_graphql("user_repository_list", username=user.login, id=user.node_id)
loc = LinesOfCode(user, ghtoken, repository_list, ignored_repos_name)
yearly_data = await loc.calculateLoc()
if showLocChart.lower() in truthy:
loc.plotLoc(yearly_data)
await loc.plotLoc(yearly_data)
return yearly_data
def get_line_of_code():
repositoryList = run_query(repositoryListQuery.substitute(username=username, id=id))
loc = LinesOfCode(id, username, ghtoken, repositoryList, ignored_repos_name)
yearly_data = loc.calculateLoc()
async def get_line_of_code() -> str:
repositoryList = await DownloadManager.get_remote_graphql("user_repository_list", username=user.login, id=user.node_id)
loc = LinesOfCode(user, ghtoken, repositoryList, ignored_repos_name)
yearly_data = await loc.calculateLoc()
total_loc = sum(
[yearly_data[year][quarter][lang] for year in yearly_data for quarter in yearly_data[year] for lang in
yearly_data[year][quarter]])
return millify(int(total_loc))
def get_short_info(github):
async def get_short_info():
string = '**🐱 ' + translate['My GitHub Data'] + '** \n\n'
user_info = github.get_user()
if user_info.disk_usage is None:
if user.disk_usage is None:
disk_usage = humanize.naturalsize(0)
print("Please add new github personal access token with user permission")
else:
disk_usage = humanize.naturalsize(user_info.disk_usage)
request = requests.get('https://github-contributions.vercel.app/api/v1/' + user_info.login)
if request.status_code == 200 and len(request.json()['years']) > 0:
this_year_data = request.json()['years'][0]
disk_usage = humanize.naturalsize(user.disk_usage)
data = await DownloadManager.get_remote_json("github_stats")
if len(data['years']) > 0:
this_year_data = data['years'][0]
total = this_year_data['total']
year = this_year_data['year']
string += '> 🏆 ' + translate['Contributions in the year'] % (humanize.intcomma(total), year) + '\n > \n'
string += '> 📦 ' + translate["Used in GitHub's Storage"] % disk_usage + ' \n > \n'
is_hireable = user_info.hireable
public_repo = user_info.public_repos
private_repo = user_info.owned_private_repos
is_hireable = user.hireable
public_repo = user.public_repos
private_repo = user.owned_private_repos
if private_repo is None:
private_repo = 0
if is_hireable:
@@ -471,52 +346,45 @@ def get_short_info(github):
return string
def get_stats(github):
async def get_stats(github) -> str:
'''Gets API data and returns markdown progress'''
stats = ''
repositoryList = run_query(repositoryListQuery.substitute(username=username, id=id))
repositoryList = await DownloadManager.get_remote_graphql("user_repository_list", username=user.login, id=user.node_id)
if show_loc.lower() in truthy or showLocChart.lower() in truthy:
# This condition is written to calculate the lines of code because it is heavy process soo needs to be calculate once this will reduce the execution time
yearly_data = get_yearly_data()
await get_yearly_data()
if show_total_code_time.lower() in truthy:
request = requests.get(
f"https://wakatime.com/api/v1/users/current/all_time_since_today?api_key={waka_key}")
if request.status_code == 401:
print("Error With WAKA time API returned " + str(request.status_code) + " Response " + str(request.json()))
elif "text" not in request.json()["data"]:
print("User stats are calculating. Try again later.")
else:
data = request.json()
stats += '![Code Time](http://img.shields.io/badge/' + quote(
str("Code Time")) + '-' + quote(str(
data['data']['text'])) + '-blue)\n\n'
data = await DownloadManager.get_remote_json("waka_all")
stats += '![Code Time](http://img.shields.io/badge/' + quote(
str("Code Time")) + '-' + quote(str(
data['data']['text'])) + '-blue)\n\n'
if show_profile_view.lower() in truthy:
data = run_v3_api(get_profile_view.substitute(owner=username, repo=username))
data = github.get_repo(f"{user.login}/{user.login}").get_views_traffic(per="week")
stats += '![Profile Views](http://img.shields.io/badge/' + quote(str(translate['Profile Views'])) + '-' + str(
data['count']) + '-blue)\n\n'
if show_loc.lower() in truthy:
stats += '![Lines of code](https://img.shields.io/badge/' + quote(
str(translate['From Hello World I have written'])) + '-' + quote(
str(get_line_of_code())) + '%20' + quote(str(translate['Lines of code'])) + '-blue)\n\n'
str(await get_line_of_code())) + '%20' + quote(str(translate['Lines of code'])) + '-blue)\n\n'
if show_short_info.lower() in truthy:
stats += get_short_info(github)
stats += await get_short_info()
if show_waka_stats.lower() in truthy:
stats += get_waka_time_stats()
stats += await get_waka_time_stats()
if showLanguagePerRepo.lower() in truthy:
stats = stats + generate_language_per_repo(repositoryList) + '\n\n'
if showLocChart.lower() in truthy:
stats += '**' + translate['Timeline'] + '**\n\n'
branch_name = github.get_repo(f'{username}/{username}').default_branch
stats = stats + '![Chart not found](https://raw.githubusercontent.com/' + username + '/' + username + '/' + branch_name + '/charts/bar_graph.png) \n\n'
branch_name = github.get_repo(f'{user.login}/{user.login}').default_branch
stats = stats + '![Chart not found](https://raw.githubusercontent.com/' + user.login + '/' + user.login + '/' + branch_name + '/charts/bar_graph.png) \n\n'
if show_updated_date.lower() in truthy:
now = datetime.datetime.utcnow()
@@ -526,10 +394,6 @@ def get_stats(github):
return stats
# def star_me():
# requests.put("https://api.github.com/user/starred/anmol098/waka-readme-stats", headers=headers)
def decode_readme(data: str):
'''Decode the contents of old readme'''
decoded_bytes = base64.b64decode(data)
@@ -542,52 +406,52 @@ def generate_new_readme(stats: str, readme: str):
return re.sub(listReg, stats_in_readme, readme)
if __name__ == '__main__':
async def main():
global translate, user
if ghtoken is None:
raise Exception('Token not available')
user = Github(ghtoken).get_user()
print(f"Current user: {user.login}")
await init_download_manager(waka_key, ghtoken, user)
try:
start_time = datetime.datetime.now().timestamp() * 1000
if ghtoken is None:
raise Exception('Token not available')
g = Github(ghtoken)
headers = {"Authorization": "Bearer " + ghtoken}
user_data = run_query(userInfoQuery) # Execute the query
if "errors" in user_data:
raise Exception(user_data)
username = user_data["data"]["viewer"]["login"]
id = user_data["data"]["viewer"]["id"]
email = user_data["data"]["viewer"]["email"]
print("Username " + username)
repo = g.get_repo(f"{username}/{username}")
contents = repo.get_readme()
try:
with open(os.path.join(os.path.dirname(__file__), 'translation.json'), encoding='utf-8') as config_file:
data = json.load(config_file)
translate = data[locale]
except Exception as e:
print("Cannot find the Locale choosing default to english")
translate = data['en']
waka_stats = get_stats(g)
# star_me()
rdmd = decode_readme(contents.content)
new_readme = generate_new_readme(stats=waka_stats, readme=rdmd)
if commit_by_me.lower() in truthy:
committer = InputGitAuthor(username or commit_username, email or commit_email)
else:
committer = InputGitAuthor(
commit_username or 'readme-bot',
commit_email or '41898282+github-actions[bot]@users.noreply.github.com'
)
if new_readme != rdmd:
try:
repo.update_file(path=contents.path, message=commit_message,
content=new_readme, sha=contents.sha, branch=branchName,
committer=committer)
except:
repo.update_file(path=contents.path, message=commit_message,
content=new_readme, sha=contents.sha, branch='main',
committer=committer)
print("Readme updated")
end_time = datetime.datetime.now().timestamp() * 1000
print("Program processed in {} miliseconds.".format(round(end_time - start_time, 0)))
with open(os.path.join(os.path.dirname(__file__), 'translation.json'), encoding='utf-8') as config_file:
data = json.load(config_file)
translate = data[locale]
except Exception as e:
traceback.print_exc()
print("Exception Occurred " + str(e))
print("Cannot find the Locale choosing default to english")
translate = data['en']
g = Github(ghtoken)
waka_stats = await get_stats(g)
repo = g.get_repo(f"{user.login}/{user.login}")
contents = repo.get_readme()
rdmd = decode_readme(contents.content)
new_readme = generate_new_readme(stats=waka_stats, readme=rdmd)
if commit_by_me.lower() in truthy:
committer = InputGitAuthor(user.login or commit_username, user.email or commit_email)
else:
committer = InputGitAuthor(
commit_username or 'readme-bot',
commit_email or '41898282+github-actions[bot]@users.noreply.github.com'
)
if new_readme != rdmd:
try:
repo.update_file(path=contents.path, message=commit_message,
content=new_readme, sha=contents.sha, branch=branchName,
committer=committer)
except:
repo.update_file(path=contents.path, message=commit_message,
content=new_readme, sha=contents.sha, branch='main',
committer=committer)
print("Readme updated")
if __name__ == '__main__':
start_time = datetime.datetime.now().timestamp() * 1000
run(main())
end_time = datetime.datetime.now().timestamp() * 1000
print(f"Program processed in {round(end_time - start_time, 0)} miliseconds.")

View File

@@ -1,106 +1,105 @@
import os
import pandas as pd
import numpy as np
import altair as alt
import json
import os
# npm install vega-lite vega-cli canvas
class BarGraph:
def __init__(self, yearly_data):
self.yearly_data = yearly_data
def build_graph(self):
with open(os.path.join(os.path.dirname(__file__), 'colors.json')) as f:
colors = json.load(f)
allColorsValues = []
# filter data
max_languages = 5
top_languages = {}
for year in self.yearly_data.keys():
for quarter in self.yearly_data[year].keys():
for language in sorted(list(self.yearly_data[year][quarter].keys()),
key=lambda lang: self.yearly_data[year][quarter][lang], reverse=True)[
0:max_languages]:
if 'top' not in self.yearly_data[year][quarter]:
self.yearly_data[year][quarter]['top'] = {}
if self.yearly_data[year][quarter][language] != 0:
self.yearly_data[year][quarter]['top'][language] = self.yearly_data[year][quarter][language]
if language not in top_languages:
top_languages[language] = 1
top_languages[language] += 1
# print(self.yearly_data)
all_languages = list(top_languages.keys())
for language in all_languages:
if colors[language]['color'] is not None:
allColorsValues.append(colors[language]['color'])
languages_all_loc = {}
for language in all_languages:
language_year = []
for year in self.yearly_data.keys():
language_quarter = [0, 0, 0, 0]
for quarter in self.yearly_data[year].keys():
if language in self.yearly_data[year][quarter]['top']:
language_quarter[quarter - 1] = self.yearly_data[year][quarter]['top'][language]
else:
language_quarter[quarter - 1] = 0
language_year.append(language_quarter)
languages_all_loc[language] = language_year
# print(languages_all_loc)
language_df = {}
def prep_df(df, name):
df = df.stack().reset_index()
df.columns = ['c1', 'c2', 'values']
df['Language'] = name
return df
for language in languages_all_loc.keys():
language_df[language] = pd.DataFrame(languages_all_loc[language], index=list(self.yearly_data.keys()),
columns=["Q1", "Q2", "Q3", "Q4"])
for language in language_df.keys():
language_df[language] = prep_df(language_df[language], language)
df = pd.concat(language_df.values())
chart = alt.Chart(df).mark_bar().encode(
# tell Altair which field to group columns on
x=alt.X('c2:N', title=None),
# tell Altair which field to use as Y values and how to calculate
y=alt.Y('sum(values):Q',
axis=alt.Axis(
grid=False,
title='LOC added')),
# tell Altair which field to use to use as the set of columns to be represented in each group
column=alt.Column('c1:N', title=None),
# tell Altair which field to use for color segmentation
color=alt.Color('Language:N',
scale=alt.Scale(
domain=all_languages,
# make it look pretty with an enjoyable color pallet
range=allColorsValues,
),
)) \
.configure_view(
# remove grid lines around column clusters
strokeOpacity=0
)
chart.save('bar_graph.png')
return 'bar_graph.png'
import pandas as pd
import altair as alt
from download_manager import DownloadManager
# npm install vega-lite vega-cli canvas
class BarGraph:
def __init__(self, yearly_data):
self.yearly_data = yearly_data
async def build_graph(self):
colors = await DownloadManager.get_remote_yaml("linguist")
allColorsValues = []
# filter data
max_languages = 5
top_languages = {}
for year in self.yearly_data.keys():
for quarter in self.yearly_data[year].keys():
for language in sorted(list(self.yearly_data[year][quarter].keys()),
key=lambda lang: self.yearly_data[year][quarter][lang], reverse=True)[
0:max_languages]:
if 'top' not in self.yearly_data[year][quarter]:
self.yearly_data[year][quarter]['top'] = {}
if self.yearly_data[year][quarter][language] != 0:
self.yearly_data[year][quarter]['top'][language] = self.yearly_data[year][quarter][language]
if language not in top_languages:
top_languages[language] = 1
top_languages[language] += 1
# print(self.yearly_data)
all_languages = list(top_languages.keys())
for language in all_languages:
if colors[language]['color'] is not None:
allColorsValues.append(colors[language]['color'])
languages_all_loc = {}
for language in all_languages:
language_year = []
for year in self.yearly_data.keys():
language_quarter = [0, 0, 0, 0]
for quarter in self.yearly_data[year].keys():
if language in self.yearly_data[year][quarter]['top']:
language_quarter[quarter - 1] = self.yearly_data[year][quarter]['top'][language]
else:
language_quarter[quarter - 1] = 0
language_year.append(language_quarter)
languages_all_loc[language] = language_year
# print(languages_all_loc)
language_df = {}
def prep_df(df, name):
df = df.stack().reset_index()
df.columns = ['c1', 'c2', 'values']
df['Language'] = name
return df
for language in languages_all_loc.keys():
language_df[language] = pd.DataFrame(languages_all_loc[language], index=list(self.yearly_data.keys()),
columns=["Q1", "Q2", "Q3", "Q4"])
for language in language_df.keys():
language_df[language] = prep_df(language_df[language], language)
df = pd.concat(language_df.values())
chart = alt.Chart(df).mark_bar().encode(
# tell Altair which field to group columns on
x=alt.X('c2:N', title=None),
# tell Altair which field to use as Y values and how to calculate
y=alt.Y('sum(values):Q',
axis=alt.Axis(
grid=False,
title='LOC added')),
# tell Altair which field to use to use as the set of columns to be represented in each group
column=alt.Column('c1:N', title=None),
# tell Altair which field to use for color segmentation
color=alt.Color('Language:N',
scale=alt.Scale(
domain=all_languages,
# make it look pretty with an enjoyable color pallet
range=allColorsValues,
),
)) \
.configure_view(
# remove grid lines around column clusters
strokeOpacity=0
)
chart.save('bar_graph.png')
return 'bar_graph.png'