refactoring

This commit is contained in:
2024-07-02 19:29:58 +02:00
parent 51a0a8ab4f
commit 458333a011
6 changed files with 192 additions and 140 deletions

View File

@@ -1,5 +1,6 @@
from enum import Enum
class DeadlineStatus(Enum):
FINE = 1
OVERDUE = 2

View File

@@ -31,9 +31,7 @@ with open(CONFIG_PATH, "rb") as f:
DATABASE_PATH = utils.get_project_root() / _config["database"]["path"]
app = typer.Typer(
add_completion=False, no_args_is_help=True
)
app = typer.Typer(add_completion=False, no_args_is_help=True)
console = Console()
@@ -121,7 +119,9 @@ def upload_things_to_reclaim(dry_run : bool = False):
tasks = things_handler.get_all_things_tasks()
with UploadedTasksDB(DATABASE_PATH) as db:
uploaded_task_ids = db.get_all_uploaded_tasks()
tasks_to_upload = [task for task in tasks if task["uuid"] not in uploaded_task_ids]
tasks_to_upload = [
task for task in tasks if task["uuid"] not in uploaded_task_ids
]
if not tasks_to_upload:
print("No new tasks were found")
else:
@@ -130,19 +130,19 @@ def upload_things_to_reclaim(dry_run : bool = False):
if not dry_run:
things_to_reclaim(task)
db.add_uploaded_task(task["uuid"])
print(f"Uploaded {len(tasks_to_upload)} task{'s' if len(tasks_to_upload) > 1 else ''}")
print(
f"Uploaded {len(tasks_to_upload)} task{'s' if len(tasks_to_upload) > 1 else ''}"
)
@app.command("list")
def list_reclaim_tasks(subject: Annotated[Optional[str],
typer.Argument()] = None):
def list_reclaim_tasks(subject: Annotated[Optional[str], typer.Argument()] = None):
"""
List all current tasks
"""
reclaim_tasks = reclaim_handler.get_reclaim_tasks()
if subject is not None:
reclaim_tasks = reclaim_handler.filter_for_subject(
subject, reclaim_tasks)
reclaim_tasks = reclaim_handler.filter_for_subject(subject, reclaim_tasks)
current_date = datetime.now(tz.tzutc())
table = Table("Index", "Task", "Days left", title="Task list")
for index, task in enumerate(reclaim_tasks):
@@ -158,10 +158,7 @@ def list_reclaim_tasks(subject: Annotated[Optional[str],
days_left = (due_date - current_date).days
date_str = Text(f"{days_left} days left")
date_str.stylize("bold white")
table.add_row(
f"({index + 1})",
task.name,
date_str)
table.add_row(f"({index + 1})", task.name, date_str)
console.print(table)
@@ -198,43 +195,42 @@ def stop_task():
utils.perror("No task is currently tracked in toggl")
return
stopped_task_name = current_task.description
current_task_name = current_task.description
if stopped_task_name is None:
if current_task_name is None:
utils.perror("Current toggl task has no name")
return
reclaim_dict = {
task.name: task for task in reclaim_handler.get_reclaim_tasks()}
reclaim_dict = {task.name: task for task in reclaim_handler.get_reclaim_tasks()}
if stopped_task_name in reclaim_dict.keys():
stopped_task = reclaim_dict[stopped_task_name]
if current_task_name in reclaim_dict.keys():
reclaim_task = reclaim_dict[current_task_name]
else:
stopped_task = utils.get_closest_match(stopped_task_name, reclaim_dict)
reclaim_task = utils.get_closest_match(current_task_name, reclaim_dict)
if stopped_task is None:
utils.perror(f"{stopped_task} not found in reclaim")
if reclaim_task is None:
utils.perror(f"{current_task_name} not found in reclaim")
return
stop_time = datetime.now(tz.tzutc())
if callable(current_task.start):
current_task.start = current_task.start()
toggl_handler.stop_current_task()
stopped_task = toggl_handler.stop_task(current_task)
if stopped_task is None:
utils.perror(f"{current_task_name} could not be stopped")
return
start_time = toggl_handler.get_start_time(stopped_task)
stop_time = toggl_handler.get_stop_time(stopped_task)
is_task_finished = Confirm.ask("Is task finished?", default=False)
if stopped_task.is_scheduled:
reclaim_handler.log_work_for_task(
stopped_task, current_task.start, stop_time)
else:
try:
reclaim_handler.log_work_for_task(reclaim_task, start_time, stop_time)
except ValueError:
utils.pwarning("Work could not be logged in reclaim!")
if is_task_finished:
finish_task(stopped_task)
rprint(f"Finished {stopped_task.name}")
finish_task(reclaim_task)
rprint(f"Finished {reclaim_task.name}")
utils.plogtime(current_task.start, stop_time, stopped_task.name)
utils.plogtime(start_time, stop_time, reclaim_task.name)
@app.command("stats")
@@ -245,20 +241,22 @@ def show_task_stats():
current_date = datetime.now(tz.tzutc())
reclaim_tasks = reclaim_handler.get_reclaim_tasks()
tasks_fine = reclaim_handler.filter_for_deadline_status(current_date, DeadlineStatus.FINE, reclaim_tasks)
tasks_overdue = reclaim_handler.filter_for_deadline_status(current_date, DeadlineStatus.OVERDUE, reclaim_tasks)
tasks_fine = reclaim_handler.filter_for_deadline_status(
current_date, DeadlineStatus.FINE, reclaim_tasks
)
tasks_overdue = reclaim_handler.filter_for_deadline_status(
current_date, DeadlineStatus.OVERDUE, reclaim_tasks
)
fine_per_course = ["Fine"]
overdue_per_course = ["Overdue"]
course_names = things_handler.get_course_names()
for course_name in course_names:
fine_per_course.append(
str(len(reclaim_handler.filter_for_subject
(course_name, tasks_fine)))
str(len(reclaim_handler.filter_for_subject(course_name, tasks_fine)))
)
overdue_per_course.append(
str(len(reclaim_handler.filter_for_subject
(course_name, tasks_overdue)))
str(len(reclaim_handler.filter_for_subject(course_name, tasks_overdue)))
)
table = Table(*(["Status"] + course_names))
@@ -269,11 +267,13 @@ def show_task_stats():
@app.command("time")
def print_time_needed():
def print_time_needed(subject: Annotated[Optional[str], typer.Argument()] = None):
"""
Print sum of time needed for all reclaim tasks
"""
tasks = reclaim_handler.get_reclaim_tasks()
if subject is not None:
tasks = reclaim_handler.filter_for_subject(subject, tasks)
time_needed = 0
for task in tasks:
@@ -282,11 +282,19 @@ def print_time_needed():
time_needed += task_duration
print(f"Time needed to complete {len(tasks)} Tasks: {time_needed} hrs")
print(f"Average time needed to complete a Task: {
time_needed/len(tasks):.2f} hrs")
print(
f"Average time needed to complete a Task: {
time_needed/len(tasks):.2f} hrs"
)
# sort unscheduled tasks to the end of the list
tasks.sort(key=lambda x: x.scheduled_start_date if x.scheduled_start_date is not None else datetime.max.replace(tzinfo=tz.tzutc()))
tasks.sort(
key=lambda x: (
x.scheduled_start_date
if x.scheduled_start_date is not None
else datetime.max.replace(tzinfo=tz.tzutc())
)
)
last_task_date = tasks[-1].scheduled_start_date
if last_task_date is None: # last task on todo list is not scheduled
@@ -305,10 +313,14 @@ def remove_finished_tasks_from_things(dry_run : bool = False):
Complete finished reclaim tasks in things
"""
reclaim_things_uuids = reclaim_handler.get_reclaim_things_ids()
tasks_to_be_removed = [task for task in things_handler.get_all_uploaded_things_tasks()
if task["uuid"] not in reclaim_things_uuids]
tasks_to_be_removed = [
task
for task in things_handler.get_all_uploaded_things_tasks()
if task["uuid"] not in reclaim_things_uuids
]
if not tasks_to_be_removed:
print("Reclaim and Things are synced!")
return 0
else:
for task in tasks_to_be_removed:
print(
@@ -317,24 +329,44 @@ def remove_finished_tasks_from_things(dry_run : bool = False):
)
if not dry_run:
finish_task(task["uuid"])
return len(tasks_to_be_removed)
@app.command("tracking")
def sync_toggl_reclaim_tracking(since_days: Annotated[int, typer.Argument()] = 0):
toggl_time_entries = toggl_handler.get_time_entries_since(since_days = since_days) # end date is inclusive
toggl_time_entries = toggl_handler.get_time_entries_since(
since_days=since_days
) # end date is inclusive
if toggl_time_entries is None:
utils.pwarning(f"No tasks tracked in Toggl since {since_days} days")
return
reclaim_time_entries = reclaim_handler.get_task_events_since(since_days = since_days) # end date is inclusive
reclaim_time_entries_dict = {utils.get_clean_time_entry_name(k) : list(g) for k,g in itertools.groupby(reclaim_time_entries, lambda r : r.name)}
non_existent_time_entries = [time_entry for time_entry in toggl_time_entries if time_entry.description not in reclaim_time_entries_dict.keys()]
reclaim_time_entries = reclaim_handler.get_task_events_since(
since_days=since_days
) # end date is inclusive
reclaim_time_entries_dict = {
utils.get_clean_time_entry_name(k): list(g)
for k, g in itertools.groupby(reclaim_time_entries, lambda r: r.name)
}
non_existent_time_entries = [
time_entry
for time_entry in toggl_time_entries
if time_entry.description not in reclaim_time_entries_dict.keys()
]
# add all existing mismatched_time_entries
time_entries_to_adjust : Dict[toggl_handler.TimeEntry, reclaim_handler.ReclaimTaskEvent] = {}
time_entries_to_adjust: Dict[
toggl_handler.TimeEntry, reclaim_handler.ReclaimTaskEvent
] = {}
for toggl_time_entry in toggl_time_entries:
if toggl_time_entry.description in reclaim_time_entries_dict.keys():
nearest_entry = utils.nearest_time_entry(reclaim_time_entries_dict.get(toggl_time_entry.description), toggl_time_entry) #pyright: ignore
nearest_entry = utils.nearest_time_entry(
reclaim_time_entries_dict.get(toggl_time_entry.description),
toggl_time_entry,
) # pyright: ignore
if toggl_time_entry in time_entries_to_adjust:
time_entries_to_adjust[toggl_time_entry] = utils.nearest_time_entry([time_entries_to_adjust[toggl_time_entry], nearest_entry], toggl_time_entry) #pyright: ignore
time_entries_to_adjust[toggl_time_entry] = utils.nearest_time_entry(
[time_entries_to_adjust[toggl_time_entry], nearest_entry],
toggl_time_entry,
) # pyright: ignore
elif nearest_entry is not None:
time_entries_to_adjust[toggl_time_entry] = nearest_entry
@@ -343,45 +375,17 @@ def sync_toggl_reclaim_tracking(since_days : Annotated[int, typer.Argument()] =
return
if not mismatched_time_entries:
utils.pinfo("Toggl and Reclaim time entries are synced.")
else:
for time_entry in mismatched_time_entries:
name = time_entry.description
if not name:
raise ValueError("Toggl time entry has empty description")
reclaim_task = reclaim_handler.get_reclaim_task(name)
if not reclaim_task:
utils.pwarning(f"Couldn't find {time_entry.description} in Reclaim.")
continue
toggl_start = toggl_handler.get_start_time(time_entry)
toggl_stop = toggl_handler.get_stop_time(time_entry)
if toggl_start.tzinfo is None:
raise ValueError("Toggl start time is not timezone aware")
if toggl_stop.tzinfo is None:
raise ValueError("Toggl stop time is not timezone aware")
reclaim_time_entry = reclaim_time_entries_dict[reclaim_task.name]
if reclaim_time_entry is None:
reclaim_handler.log_work_for_task(reclaim_task, start, stop)
else:
reclaim_handler.adjust_time_entry(reclaim_time_entry, start, stop)
utils.plogtime(start, stop, reclaim_task.name)
@app.command("current")
def display_current_task():
current_task = toggl_handler.get_current_time_entry()
if current_task is None:
utils.perror("No task is currently tracked in toggl")
return
rprint(f"Current task: {current_task.description}\nStarted at:",
rprint(
f"Current task: {current_task.description}\nStarted at:",
f"{toggl_handler.get_start_time(current_task)
.astimezone(tz.gettz()).strftime("%H:%M")}")
.astimezone(tz.gettz()).strftime("%H:%M")}",
)
@app.command("sync")
@@ -392,9 +396,14 @@ def sync_things_and_reclaim(dry_run : bool = False):
Then upload all new tasks from things to reclaim
"""
utils.pinfo("Pulling from Reclaim")
remove_finished_tasks_from_things(dry_run)
removed_tasks = remove_finished_tasks_from_things(dry_run)
rprint("---------------------------------------------")
time.sleep(1) # stop tool from uploading recently finished tasks
time_to_wait = 2 * removed_tasks
if time_to_wait > 0:
rprint(f"Waiting {time_to_wait}s for things to mark tasks as completed")
time.sleep(
time_to_wait
) # wait for os.system call in things_handler.complete to take effect
utils.pinfo("Pushing to Reclaim")
upload_things_to_reclaim(dry_run)
rprint("---------------------------------------------")

View File

@@ -35,6 +35,7 @@ def get_reclaim_task(name : str) -> Optional[ReclaimTask]:
else:
return res[0]
def get_reclaim_tasks() -> List[ReclaimTask]:
return ReclaimTask.search()
@@ -49,13 +50,19 @@ def filter_for_subject(subject, tasks):
return [task for task in tasks if task.name.startswith(subject)]
def filter_for_deadline_status(cur_date : datetime, deadline_status : DeadlineStatus, tasks : List[ReclaimTask]):
def filter_for_deadline_status(
cur_date: datetime, deadline_status: DeadlineStatus, tasks: List[ReclaimTask]
):
# [task for task in reclaim_tasks if task.due_date >= current_date]
match deadline_status:
case DeadlineStatus.FINE:
return [task for task in tasks if (task.due_date and task.due_date >= cur_date)]
return [
task for task in tasks if (task.due_date and task.due_date >= cur_date)
]
case DeadlineStatus.OVERDUE:
return [task for task in tasks if (task.due_date and task.due_date < cur_date)]
return [
task for task in tasks if (task.due_date and task.due_date < cur_date)
]
case DeadlineStatus.NONE:
return [task for task in tasks if task.due_date is None]
@@ -64,10 +71,16 @@ def get_project(task: ReclaimTask):
return task.name.split(" ")[0]
def get_clean_time_entry_name(name: str):
return emoji.replace_emoji(name).lstrip()
def is_task_time_entry(name: str):
decoded_name = emoji.demojize(name)
# task entries start either with a :thumbs_up: or a :check_mark_button: emoji
return decoded_name.startswith(":thumbs_up:") | decoded_name.startswith(":check_mark_button:")
return decoded_name.startswith(":thumbs_up:") | decoded_name.startswith(
":check_mark_button:"
)
def get_task_events_since(since_days: int = 0) -> List[ReclaimTaskEvent]:
@@ -77,6 +90,7 @@ def get_task_events_since(since_days: int = 0) -> List[ReclaimTaskEvent]:
events = ReclaimTaskEvent.search(date_since, date_end)
return [event for event in events if is_task_time_entry(event.name)]
def get_events_date_range(from_date: date, to_date: date):
return ReclaimTaskEvent.search(from_date, to_date)
@@ -122,10 +136,13 @@ def finish_task(task: ReclaimTask):
def get_things_id(task: ReclaimTask):
things_id_match = things_id_pattern.match(task.description)
if things_id_match is None:
raise ValueError(f"things id could not be found in description of reclaim task {task.name}")
raise ValueError(
f"things id could not be found in description of reclaim task {task.name}"
)
things_id_tag: str = things_id_match.group()
return things_id_tag.split(":")[1]
def get_reclaim_things_ids() -> List[str]:
return [get_things_id(task) for task in ReclaimTask.search()]

View File

@@ -33,17 +33,18 @@ time_entry_editor = toggl_python.WorkspaceTimeEntries(
def get_time_entry(time_entry_id: int) -> toggl_python.TimeEntry:
return toggl_python.TimeEntries(auth=auth).retrieve(time_entry_id)
def delete_time_entry(time_entry_id: int) -> bool:
return time_entry_editor.delete_timeentry(time_entry_id)
def get_start_time(time_entry: toggl_python.TimeEntry):
return time_entry.start() if callable(time_entry.start) else time_entry.start
def get_stop_time(time_entry: toggl_python.TimeEntry):
if time_entry.stop is None:
return get_start_time(time_entry) + timedelta(seconds=time_entry.duration)
else:
raise ValueError(f"TimeEntry {time_entry.description} is still running")
return time_entry.stop() if callable(time_entry.stop) else time_entry.stop
@@ -115,10 +116,13 @@ def create_task_time_entry(
duration=duration,
)
if start is not None:
if start is None:
start = datetime.now(tz.tzutc())
else:
if start.tzinfo is None:
raise ValueError("start has to be timezone aware")
start = start.astimezone(tz.tzutc())
time_entry.start = start
tag = get_approriate_tag(description)
@@ -132,14 +136,21 @@ def start_task(description: str, project: str):
time_entry_editor.create(create_task_time_entry(description, project))
def stop_task(task: TimeEntry) -> TimeEntry | None:
if time_entry_editor.DETAIL_URL is None:
raise ValueError("DetailURL not set")
url = time_entry_editor.BASE_URL.join(
time_entry_editor.DETAIL_URL.format(id=task.id)
)
response = time_entry_editor.patch(url)
data = response.json()
if data:
return TimeEntry(**data)
def stop_current_task():
cur_task = get_current_time_entry()
if cur_task is None:
raise RuntimeError("No time entry is currently running")
if time_entry_editor.DETAIL_URL is None:
raise ValueError("DetailURL not set")
url = time_entry_editor.BASE_URL.join(
time_entry_editor.DETAIL_URL.format(id=cur_task.id)
)
return time_entry_editor.patch(url)
return stop_task(cur_task)

View File

@@ -18,6 +18,7 @@ pattern = re.compile(TIME_PATTERN)
T = TypeVar("T") # generic type
def calculate_time_on_unit(tag_value: str) -> float:
# This is a regex to match time in the format of 1h 30m
# Minutes are optional if hours are present
@@ -39,7 +40,11 @@ def calculate_time_on_unit(tag_value: str) -> float:
def get_start_time(toggl_time_entry: TimeEntry):
return toggl_time_entry.start() if callable(toggl_time_entry.start) else toggl_time_entry.start
return (
toggl_time_entry.start()
if callable(toggl_time_entry.start)
else toggl_time_entry.start
)
def get_stop_time(time_entry: TimeEntry):
@@ -90,17 +95,24 @@ def get_closest_match(name: str, candidates: Dict[str, T]) -> T | None:
return candidates[ListPrompt.ask("Select a candidate", possible_candidates)]
def nearest_time_entry(items: Optional[List[ReclaimTaskEvent]], pivot: TimeEntry) -> Optional[ReclaimTaskEvent]:
def nearest_time_entry(
items: Optional[List[ReclaimTaskEvent]], pivot: TimeEntry
) -> Optional[ReclaimTaskEvent]:
if not items:
return None
return min(items, key=lambda x: abs(x.start - get_start_time(pivot)))
def is_matching_time_entry(toggl_time_entry : Optional[TimeEntry], reclaim_time_entry : Optional[ReclaimTaskEvent]):
def is_matching_time_entry(
toggl_time_entry: Optional[TimeEntry],
reclaim_time_entry: Optional[ReclaimTaskEvent],
):
if toggl_time_entry is None or reclaim_time_entry is None:
print("One is none")
return False
if toggl_time_entry.description != get_clean_time_entry_name(reclaim_time_entry.name):
if toggl_time_entry.description != get_clean_time_entry_name(
reclaim_time_entry.name
):
print(f"toggl title: {toggl_time_entry.description}")
print(f"reclaim title: {get_clean_time_entry_name(reclaim_time_entry.name)}")
return False
@@ -129,6 +141,7 @@ def is_matching_time_entry(toggl_time_entry : Optional[TimeEntry], reclaim_time_
return False
return True
def pinfo(msg: str):
rprint(f"[bold white]{msg}[/bold white]")
@@ -151,10 +164,11 @@ def plogtime(start_time: datetime, end_time: datetime, task_name: str):
if end_time.tzinfo is None:
raise ValueError("end_time has to be timezone aware.")
rprint(
(f"Logged work from {start_time.astimezone(local_zone).strftime(time_format)} "
f"to {end_time.astimezone(local_zone).strftime(time_format)} for {task_name}")
(
f"Logged work from {start_time.astimezone(local_zone).strftime(time_format)} "
f"to {end_time.astimezone(local_zone).strftime(time_format)} for {task_name}"
)
)