Source code for akride.core.entities.jobs

"""
 Copyright (C) 2024, Akridata, Inc - All Rights Reserved.
 Unauthorized copying of this file, via any medium is strictly prohibited
"""
from typing import Dict

import akridata_dsp as dsp

from akride.core.entities.datasets import Dataset
from akride.core.entities.entity import Entity
from akride.core.types import AnalyzeJobParams, CatalogTable, PlotFeaturizer

from akride.core.enums import (  # isort:skip
    ClusterAlgoType,
    EmbedAlgoType,
    JobType,
)


[docs]class JobSpec(Dict): """ Class representing a job specification. TODO: separate specs for different job types """ def __init__(self, dataset: Dataset, **kwargs): """ Constructor for the JobSpec class. Parameters: ----------- dataset: Dataset The dataset to explore. job_type : str The job type - 'EXPLORE', 'ANALYZE' etc. job_name : str, optional The name of the job to create. A unique name will be generated if this is not given. predictions_file: str, optional The path to the catalog file containing predictions and ground truth. This file must be formatted according to the specification at: https://docs.akridata.ai/docs/analyze-job-creation-and-visualization cluster_algo : ClusterAlgoType, optional The clustering algorithm to use. embed_algo : EmbedAlgoType, optional The embedding algorithm to use. num_clusters : int, optional The number of clusters to create. pipeline: Pipeline Pipeline information max_images : int, optional The maximum number of images to use. catalog_table : CatalogTable, optional The catalog to be used for creating this explore job. This defaults to the internal primary catalog that is created automatically when a dataset is created. filters : List[Condition], optional The filters to be used to select a subset of examples for this job. These filters are applied to the catalog specified by catalog_name. analyze_params: AnalyzeParams, optional Additional params for Analyze job """ defaults = { "dataset": dataset, "job_type": JobType.EXPLORE, "predictions_file": "", "job_name": "", "cluster_algo": ClusterAlgoType.HDBSCAN, "embed_algo": EmbedAlgoType.UMAP, "num_clusters": None, "max_images": 1000, "catalog_table": CatalogTable(table_name="primary"), "filters": None, "analyze_params": None, "pipeline": None, } super().__init__() self.update(defaults) self.update(kwargs) if not self["job_name"]: # TODO: generate a unique name self["job_name"] = "DEFAULT-JOB-NAME" if ( JobType.is_analyze_job(job_type=self["job_type"]) and self["analyze_params"] is not None ): params: AnalyzeJobParams = self["analyze_params"] if self["job_type"] == JobType.ANALYZE_CLASSIFICATION: params.plot_featurizer = PlotFeaturizer.LABEL if ( self["job_type"] == JobType.ANALYZE_SEGMENTATION or self["job_type"] == JobType.ANALYZE_OBJECT_DETECTION ): params.plot_featurizer = PlotFeaturizer.CONTENT if self["pipeline"] is None: raise ValueError("Pipeline is not specified")
[docs]class Job(Entity): """ Class representing a job entity. """ def __init__(self, info: dsp.CreateJobRequestResponse): """ Constructor for the Job class. Parameters ---------- info : dsp.models.dataset_job_request.DatasetJobRequest The job request object. """ super().__init__("", "") self.info = info
[docs] def delete(self) -> None: """ Deletes an entity. Parameters ---------- Returns ------- None """ return None
@property def dataset_id(self): return self.info.dataset_id # type: ignore @property def pipeline_id(self): return self.info.pipeline_id # type: ignore
[docs] def get_num_clusters(self) -> int: """Get the default number of clusters available for visualization :raises ValueError: If job details are not available :return: int """ if self.info: return self.info.to_dict()["tunables_default"]["nclusters"] raise ValueError("job details are not unavailable")
[docs] def get_max_clusters(self) -> int: """Get total number of clusters available for visualization :raises ValueError: If job details are not available :return: int """ if self.info: return self.info.to_dict()["tunables_default"]["max_clusters"] raise ValueError("job details are not unavailable")