import threading
from typing import Dict
from akride._utils.background_task_helper import BackgroundTask
from akride._utils.progress_manager.manager import ProgressManager
from akride.core.enums import BackgroundTaskType
from akride.core.exceptions import UserError
[docs]class BackgroundTaskManager:
"""Helper class to manage background task"""
def __init__(self):
self._tasks: Dict[str, BackgroundTask] = {}
self._lock = threading.Lock()
[docs] def start_task(
self,
entity_id: str,
task_type: BackgroundTaskType,
target_function,
*args,
**kwargs,
) -> BackgroundTask:
"""
Start a background task.
Parameters
----------
:param task_type: The type of the background task.
:param entity_id: Entity ID associated with the task
:param target_function: The target function to run
:param args: Arguments for the target function
:param kwargs: Keyword arguments for the target function
Returns
-------
BackgroundTask
background task object
"""
with self._lock:
if self.is_task_running(task_type=task_type, entity_id=entity_id):
raise UserError(
f"Task {task_type.value} for entity {entity_id} already "
f"in progress!"
)
task_id = self._get_task_id(
entity_id=entity_id, task_type=task_type
)
progress_manager = ProgressManager()
task = BackgroundTask(
target_function, progress_manager, *args, **kwargs
)
task.start()
self._tasks[task_id] = task
return task
[docs] def is_task_running(
self, entity_id: str, task_type: BackgroundTaskType
) -> bool:
"""
Parameters
----------
:param entity_id: Entity ID associated with the task.
:param task_type: The type of the background task.
Returns
-------
Boolean
a boolean representing whether task is running or not.
"""
task_id = self._get_task_id(entity_id=entity_id, task_type=task_type)
task = self._tasks.get(task_id)
if task:
return not task.has_completed()
return False
@staticmethod
def _get_task_id(entity_id: str, task_type: BackgroundTaskType):
return f"{task_type.value}_{entity_id}"