"""
Copyright (C) 2025, Akridata, Inc - All Rights Reserved.
Unauthorized copying of this file, via any medium is strictly prohibited
"""
import json
from contextlib import suppress
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import akridata_akrimanager_v2 as am
import akridata_dsp as dsp
import pandas as pd
import urllib3
from akridata_akrimanager_v2 import AttachmentPolicyType, PipelineDocker
from akridata_akrimanager_v2.models.condition import Condition
from PIL import Image
from pyakri_de_utils.retry_helper import get_http_retry
from yarl import URL
from akride import logger
from akride._utils.background_task_helper import BackgroundTask
from akride._utils.dataset_utils import get_dataset_type
from akride._utils.proxy_utils import get_env_proxy_for_url
from akride.background_task_manager import BackgroundTaskManager
from akride.core import constants
from akride.core._entity_managers.catalog_manager import CatalogManager
from akride.core._entity_managers.container_manager import ContainerManager
from akride.core._entity_managers.dataset_manager import DatasetManager
from akride.core._entity_managers.docker_image_manager import (
DockerImageManager,
)
from akride.core._entity_managers.job_manager import JobManager
from akride.core._entity_managers.pipeline_manager import DockerPipelineManager
from akride.core._entity_managers.repository_manager import RepositoryManager
from akride.core._entity_managers.resultset_manager import ResultsetManager
from akride.core._entity_managers.sms_manager import SMSManager
from akride.core._entity_managers.subscriptions_manager import (
SubscriptionsManager,
)
from akride.core.entities.bgc_job import BGCJob
from akride.core.entities.catalogs import Catalog
from akride.core.entities.datasets import Dataset
from akride.core.entities.docker_image import DockerImage, DockerImageSpec
from akride.core.entities.docker_pipeline import (
DockerPipeline,
DockerPipelineSpec,
)
from akride.core.entities.entity import Entity
from akride.core.entities.jobs import Job, JobSpec
from akride.core.entities.pipeline import Pipeline
from akride.core.entities.resultsets import Resultset
from akride.core.entities.sms_secrets import SMSSecrets
from akride.core.exceptions import ErrorMessages, ServerError, UserError
from akride.core.models.bc_attachment_job_status import BGCAttachmentJobStatus
from akride.core.models.catalogs.catalog_details import CatalogDetails
from akride.core.models.catalogs.catalog_view_info import CatalogViewInfo
from akride.core.models.datasets.get_unprocessed_files import (
DatasetUnprocessedFiles,
)
from akride.core.models.progress_info import ProgressInfo
from akride.core.types import (
AnalyzeJobParams,
CatalogTable,
ClientManager,
ClusterRetrievalSpec,
Column,
ConfusionMatrixCellSpec,
CoresetSamplingSpec,
JobStatistics,
JoinCondition,
SampleInfoList,
SimilaritySearchSpec,
)
from akride.core.enums import ( # isort:skip
ClusterAlgoType,
EmbedAlgoType,
JobContext,
JobStatisticsContext,
JobType,
FeaturizerType,
CatalogTableType,
DataType,
AkridataDockerNames,
)
[docs]class AkriDEClient: # pylint:disable=R0902
"""Client class to connect to DataExplorer"""
def __init__(
self,
saas_endpoint: Optional[str] = None,
api_key: Optional[str] = None,
sdk_config_tuple: Optional[Tuple[str, str]] = None,
sdk_config_dict: Optional[dict] = None,
sdk_config_file: Optional[str] = None,
):
"""
Initializes the AkriDEClient with the saas_endpoint and api_key values
The init params could be passed in different ways, incase multiple
options are used to pass the init params the order of preference
would be
1. Positional params (saas_endpoint, api_key)
2. sdk_config_tuple
3. sdk_config
4. sdk_config_file
Get the sdk config by signing in to Data Explorer UI and navigating to
Utilities → Get CLI/SDK config.
For detailed information on how to download the SDK config refer
https://docs.akridata.ai/docs/download-config-saas
Parameters
Parameters
----------
saas_endpoint: str
Dataexplorer endpoint, if None defaults to https://app.akridata.ai
api_key: str
api_key value obtained from Dataexplorer
sdk_config_tuple: tuple
A tuple consisting of saas_endpoint and api_key in that order
sdk_config_dict: dict
dictionary containing "saas_endpoint" and "api_key"
sdk_config_file: str
Path to the the SDK config file downloaded from Dataexplorer
Raises
---------
InvalidAuthConfigError: if api-key/host is invalid
ServerNotReachableError: if the server is unreachable
"""
try:
app_endpoint, access_key = self._get_auth_config(
saas_endpoint=saas_endpoint,
api_key=api_key,
sdk_config_tuple=sdk_config_tuple,
sdk_config_dict=sdk_config_dict,
sdk_config_file=sdk_config_file,
)
except Exception as ex:
raise UserError(
message=ErrorMessages.SDK_USER_ERR_01_INVALID_AUTH,
status_code=401,
) from ex
if app_endpoint is None or access_key is None:
raise UserError(
message=ErrorMessages.SDK_USER_ERR_01_INVALID_AUTH,
status_code=401,
)
proxy, proxy_headers = self._get_proxy_url_and_headers(app_endpoint)
app_endpoint: str = app_endpoint.split(sep="//")[1]
self.host = app_endpoint
self.api_key = access_key
dsp_conf = dsp.Configuration(
host=f"https://{app_endpoint}/ds-core",
)
dsp_conf.proxy = proxy # type: ignore
dsp_conf.proxy_headers = proxy_headers # type: ignore
default_retries = get_http_retry()
dsp_conf.retries = default_retries # type:ignore
dsp_client = dsp.ApiClient(
configuration=dsp_conf,
header_name="X-API-KEY",
header_value=access_key,
)
am_conf = am.Configuration(
host=f"https://{app_endpoint}/api",
)
am_conf.proxy = proxy # type: ignore
am_conf.proxy_headers = proxy_headers # type: ignore
am_conf.retries = default_retries # type:ignore
am_client = am.ApiClient(
configuration=am_conf,
header_name="X-API-KEY",
header_value=access_key,
)
task_manager = BackgroundTaskManager()
cli_manager = ClientManager(
am_client=am_client,
dsp_client=dsp_client,
background_task_manager=task_manager,
)
self.jobs = JobManager(cli_manager)
self.resultsets = ResultsetManager(cli_manager)
self.catalogs = CatalogManager(cli_manager)
self.datasets = DatasetManager(cli_manager=cli_manager)
self.subscriptions = SubscriptionsManager(cli_manager=cli_manager)
self.sms_secrets = SMSManager(cli_manager=cli_manager)
self.repository = RepositoryManager(cli_manager=cli_manager)
self.containers = ContainerManager(cli_manager=cli_manager)
self.docker_images = DockerImageManager(cli_manager=cli_manager)
self.docker_pipelines = DockerPipelineManager(cli_manager=cli_manager)
# Check if the api-key is valid
self.subscriptions.get_server_version()
logger.debug("AkriDEClient initialized")
def _get_auth_config(
self,
saas_endpoint,
api_key,
sdk_config_tuple,
sdk_config_dict,
sdk_config_file,
):
saas_ep, auth_key = None, None
if api_key:
if saas_endpoint is None:
saas_ep = constants.Constants.DEFAULT_SAAS_ENDPOINT
else:
saas_ep = saas_endpoint
auth_key = api_key
elif sdk_config_tuple:
saas_ep, auth_key = sdk_config_tuple
elif sdk_config_dict:
saas_ep, auth_key = (
sdk_config_dict["saas_endpoint"],
sdk_config_dict["api_key"],
)
elif sdk_config_file:
with open(sdk_config_file, "r", encoding="utf-8") as api_conf:
auth_config = json.load(api_conf)
saas_ep, auth_key = (
auth_config["saas_endpoint"],
auth_config["api_key"],
)
else:
raise TypeError(
"AkriDEClient Initialization requires one of the following "
" options: 'sdk_config_tuple','sdk_config_dict' "
"or 'sdk_config_file' "
)
return saas_ep, auth_key
def _get_proxy_url_and_headers(
self, host
) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
try:
with suppress(LookupError):
url = URL(host)
proxy_url, proxy_basic_auth = get_env_proxy_for_url(url)
if proxy_basic_auth:
return proxy_url.human_repr(), urllib3.make_headers(
proxy_basic_auth=proxy_basic_auth
)
return proxy_url.human_repr(), None
return None, None
except Exception as e:
raise e
[docs] def get_server_version(self) -> str:
"""Get Dataexplorer server version
Returns:
str: server version
"""
return self.subscriptions.get_server_version()
#
# Container API
#
[docs] def get_containers(
self, attributes: Optional[Dict[str, Any]] = None
) -> List[Entity]:
"""
Retrieves information about containers that have the given attributes.
Parameters
----------
attributes: Dict[str, Any], optional
The filter specification. It may have the following
optional fields:
filter_by_name: str
Filter by container name.
search_by_name : str
Search by container name.
Returns
-------
List[Entity]
A list of Entity objects representing containers.
"""
return self.containers.get_entities(attributes) # type: ignore
#
# Dataset API
#
[docs] def get_datasets(self, attributes: Dict[str, Any] = {}) -> List[Entity]:
"""
Retrieves information about datasets that have the given attributes.
Parameters
----------
attributes: Dict[str, Any], optional
The filter specification. It may have the following
optional fields:
search_key : str
Filter across fields like dataset id, and dataset name.
Returns
-------
List[Entity]
A list of Entity objects representing datasets.
"""
return self.datasets.get_entities(attributes) # type: ignore
[docs] def get_dataset_by_name(self, name: str) -> Optional[Entity]:
"""
Retrieves a dataset with the given name.
Parameters
----------
name : str
The name of the dataset to retrieve.
Returns
-------
Entity
The Entity object
representing the dataset.
"""
return self.datasets.get_entity_by_name(name)
[docs] def create_dataset(self, spec: Dict[str, Any]) -> Entity:
"""
Creates a new dataset entity.
Parameters
----------
spec : Dict[str, Any]
The dataset spec. The spec should have the following fields:
dataset_name : str
The name of the new dataset.
dataset_namespace : str, optional
The namespace for the dataset, by default 'default'.
data_type : DataType, optional
The type of data to store in the dataset, by default
DataType.IMAGE.
glob_pattern : str, optional
The glob pattern for the dataset, by default
For image datasets: value ='*(png|jpg|gif|jpeg|tiff|tif|bmp)'.
For video datasets: value = '*(mov|mp4|avi|wmv|mpg|mpeg|mkv)'
sample_frame_rate: float, optional
The frame rate per second (fps) for videos.
Applicable only for video datasets.
overwrite : bool, optional
Overwrite if a dataset with the same name exists.
Returns
-------
Entity
The created entity
"""
return self.datasets.create_entity(spec)
[docs] def delete_dataset(self, dataset: Dataset) -> bool:
"""
Deletes a dataset object.
Parameters
----------
dataset : Dataset
The dataset object to delete.
Returns
-------
bool
Indicates whether this entity was successfully deleted
"""
return self.datasets.delete_entity(dataset)
[docs] def ingest_dataset(
self,
dataset: Dataset,
data_directory: str,
use_patch_featurizer: bool = True,
with_clip_featurizer: bool = False,
async_req: bool = False,
catalog_details: Optional[CatalogDetails] = None,
) -> Optional[BackgroundTask]:
"""
Starts an asynchronous ingest task for the specified dataset.
Parameters
----------
dataset : Dataset
The dataset to ingest.
data_directory : str
The path to the directory containing the dataset files.
use_patch_featurizer: bool, optional
Ingest dataset to enable patch-based similarity searches.
with_clip_featurizer: bool, optional
Ingest dataset to enable text prompt based search.
async_req: bool, optional
Whether to execute the request asynchronously.
catalog_details: Optional[CatalogDetails]
Parameters details for creating a catalog
Returns
-------
BackgroundTask
A task object
"""
featurizer_type = (
FeaturizerType.PATCH
if use_patch_featurizer
else FeaturizerType.FULL_IMAGE
)
return self.datasets.ingest_dataset(
dataset=dataset,
data_directory=data_directory,
featurizer_type=featurizer_type,
catalog_details=catalog_details,
with_clip_featurizer=with_clip_featurizer,
async_req=async_req,
)
[docs] def attach_pipelines(
self,
dataset: Dataset,
featurizer_types: Set[FeaturizerType],
attachment_policy_type: Optional[
AttachmentPolicyType
] = AttachmentPolicyType.PUSH_MODE,
):
"""
Attach pipelines based on the featurizer types
Parameters
----------
dataset : Dataset
The dataset object to submit ingestion.
featurizer_types: Set[FeaturizerType]
Featurizers to run for the dataset
attachment_policy_type: Optional[AttachmentPolicyType]
Pipeline attachment policy type
Returns
-------
None
"""
self.datasets.attach_default_pipelines(
dataset=dataset,
featurizers=featurizer_types,
attachment_policy_type=attachment_policy_type,
)
[docs] def get_bgc_job_by_id(self, job_id: str) -> BGCJob:
"""
Get BGC job by the job id
Parameters
----------
job_id: str
Job id of the triggered BGC job
Returns
-------
BGCJob:
The background Catalog object
"""
return self.datasets.get_bgc_job(job_id=job_id)
[docs] def submit_bgc_job(
self, dataset: Dataset, pipelines: List[Pipeline]
) -> BGCJob:
"""
Submits a Background Cataloging Job for the dataset
Parameters
----------
dataset : Dataset
The dataset object to submit ingestion.
pipelines : List[Pipeline]
Pipelines to run for the job
Returns
-------
BGCJob:
The background Catalog object
"""
return self.datasets.submit_bgc_job(
dataset=dataset, pipelines=pipelines
)
[docs] def get_bgc_attached_pipeline_progress_report(
self, dataset: Dataset, pipeline: Pipeline
) -> BGCAttachmentJobStatus:
"""
Get Background Catalog progress for the dataset attachment
Parameters
----------
dataset : Dataset
The dataset object to retrieve background catalog jobs
pipeline: Pipeline
The pipeline object which is attached to dataset
Returns
-------
BGCAttachmentJobStatus:
Background Catalog status for the dataset attachment
"""
return self.datasets.get_bgc_attachment_progress_report(
dataset=dataset, pipeline=pipeline
)
[docs] def check_if_dataset_files_to_be_registered(
self, dataset: Dataset, file_paths: List[str]
) -> bool:
"""
Check if the files are not registered for the dataset
Parameters
----------
dataset: Dataset
The dataset object
file_paths: List[str]
New files to register for the dataset
Returns
-------
bool: Indicates if files need to be registered
"""
return self.datasets.check_if_files_to_be_ingested(
dataset=dataset, files=file_paths
)
[docs] def get_files_to_be_processed(
self,
dataset: Dataset,
pipeline: Pipeline,
batch_size: int,
) -> DatasetUnprocessedFiles:
"""
Get files to be processed for the dataset
Parameters
----------
dataset: Dataset
The dataset object
pipeline: Pipeline
The associated pipeline for which the files have to be obtained
batch_size: int
Number of files to be retrieved
Returns
-------
DatasetUnprocessedFiles
Dataset files to be processed.
"""
return self.datasets.get_files_to_be_processed(
dataset=dataset, pipeline=pipeline, batch_size=batch_size
)
[docs] def abort_bgc_jobs(self, dataset: Dataset, job: Optional[BGCJob] = None):
"""
Aborts background cataloging jobs for the dataset
Parameters
----------
dataset : Dataset
The dataset object to submit ingestion.
job: Optional[BGCJob]
The background catalog job object
Returns
-------
None
"""
self.datasets.abort_bgc_job(dataset=dataset, job=job)
[docs] def import_catalog(
self,
dataset: Dataset,
table_name: str,
csv_file_path: str,
create_view: bool = True,
file_name_column: Optional[str] = None,
pipeline_name: Optional[str] = None,
import_identifier: Optional[str] = None,
) -> bool:
"""
Method for importing an external catalog into a dataset.
Parameters
----------
dataset : Dataset
The dataset to import the catalog into.
table_name : str
The name of the table to create for the catalog.
csv_file_path : str
The path to the CSV file containing the catalog data.
create_view: bool default: True
Create a view with imported catalog and primary catalog table
file_name_column: str
Name of the column in the csv file that
contains the absolute filename
pipeline_name: str
Name of pipeline whose primary table will be joined with the
imported table. Ignored if create_view is false
import_identifier: str
Unique identifier for importing data
Returns
-------
bool
Indicates whether the operation was successful.
"""
if create_view and not file_name_column:
raise ValueError(
"View creation requires `file_name_column` to be specified"
)
success = self.catalogs.import_catalog(
dataset, table_name, csv_file_path, import_identifier
)
if not success:
raise ServerError("Failed to import catalog!")
if success and create_view:
# find the attached image pipeline to figure out which primary
# table is to be used to to create a view
pipelines: List[Pipeline] = self.get_attached_pipelines(
dataset=dataset
)
image_pipeline = None
if pipeline_name is None:
image_pipeline: Optional[
Pipeline
] = self.datasets.get_default_pipeline(
attached_pipelines=pipelines,
data_type=get_dataset_type(dataset.info.data_type),
)
else:
for pipeline in pipelines:
pipeline: Pipeline
if pipeline.get_name() == pipeline_name:
image_pipeline = pipeline
break
if not image_pipeline:
raise UserError(
message=f"No pipeline {pipeline_name} attached,"
" If view creation is not "
" needed, disable it by setting `create_view` to False"
)
left_table = CatalogTable(
table_name="primary",
table_type=CatalogTableType.INTERNAL,
pipeline_id=image_pipeline.get_id(),
)
right_table = CatalogTable(
table_name=table_name, table_type=CatalogTableType.INTERNAL
)
join_condition = JoinCondition(
left_column="file_name", right_column=file_name_column
)
_ = self.create_view(
view_name=f"{table_name}_primary_view",
description="Auto created view joining {table_name} "
"with {pipeline_name} primary table",
dataset=dataset,
left_table=left_table,
right_table=right_table,
join_condition=join_condition,
)
return True
[docs] def create_table(
self,
dataset: Dataset,
table_name: str,
schema: Dict[str, str],
indices: Optional[List[str]] = None,
) -> str:
"""
Adds and empty external catalog to the dataset.
Parameters
----------
dataset : Dataset
The dataset to create the catalog in.
table_name : str
The name of the table to create for the catalog.
schema : Dict[str, str]
The schema of the external catalog table
in the format {col_name: col_type}
Returns
-------
str
Returns the absolute table name for the external catalog.
"""
abs_table_name = self.catalogs.create_table(
dataset=dataset,
table_name=table_name,
schema=schema,
indices=indices,
)
return abs_table_name
[docs] def get_view_id(
self, dataset: Dataset, view_name: str
) -> Optional[CatalogViewInfo]:
"""
Retrieves the view id for a view of a dataset
Parameters
----------
dataset : Dataset
The dataset to get the view id from
view_name : str
The name of the view, to get the id
Returns
-------
Optional[CatalogViewInfo]
Returns the CatalogViewInfo object
"""
return self.catalogs.get_view_id(dataset=dataset, view_name=view_name)
[docs] def add_to_catalog(
self,
dataset: Dataset,
table_name: str,
csv_file_path: str,
import_identifier: Optional[str] = None,
) -> bool:
"""
Adds new items to an existing catalog.
Parameters
----------
dataset : Dataset
The dataset to import the catalog into.
table_name : str
The name of the table to create for the catalog.
csv_file_path : str
The path to the CSV file containing new catalog data.
import_identifier: str
Unique identifier for importing data
Returns
-------
bool
Indicates whether the operation was successful.
"""
return self.catalogs.add_to_catalog(
dataset,
table_name,
csv_file_path,
import_identifier=import_identifier,
)
[docs] def delete_catalog(self, catalog: Catalog) -> bool:
"""
Deletes a catalog object.
Parameters
----------
catalog : Catalog
The catalog object to delete.
Returns
-------
bool
Indicates whether the operation was successful.
"""
return self.catalogs.delete_entity(catalog)
[docs] def get_catalogs(self, attributes: Dict[str, Any] = {}) -> List[Entity]:
"""
Retrieves information about catalogs that have the given attributes.
Parameters
----------
attributes: Dict[str, Any]
The filter specification. It may have the following optional
fields:
name : str
filter by catalog name
status : str
filter by catalog status, can be one of
"active","inactive", "refreshing", "offline",
"invalid-config"
Returns
-------
List[Entity]
A list of Entity objects representing catalogs.
"""
return self.catalogs.get_entities(attributes)
[docs] def get_catalog_by_name(
self, dataset: Dataset, name: str
) -> Optional[Entity]:
"""
Retrieves a catalog with the given name.
Parameters
----------
dataset : Dataset
The dataset to retrieve the catalog from.
name : str
The name of the catalog to retrieve.
Returns
-------
Entity
The Entity object representing the catalog.
"""
return self.catalogs.get_catalog_by_name(dataset, name)
[docs] def get_catalog_data_count(
self,
dataset: Dataset,
table_name: str,
filter_str: Optional[str] = None,
) -> int:
"""
Retrieves the count of the number of rows in a catalog table based on filters
Parameters
----------
dataset: Dataset
The dataset to import the catalog into.
table_name: str
The catalog table name
filter_str: str
Filter the rows based on values
Returns
-------
int
The number of rows filtered
"""
return self.catalogs.get_catalog_data_count(
dataset=dataset, table_name=table_name, filter_str=filter_str
)
[docs] def get_resultset_by_id(self, resultset_id: str) -> Entity:
"""
Retrieves a resultset with the given identifier.
Parameters
----------
name : str
The name of the resultset to retrieve.
Returns
-------
Entity
The Entity object representing the resultset.
"""
return self.resultsets.get_resultset_by_id(resultset_id)
[docs] def get_resultsets(self, attributes: Dict[str, Any] = {}) -> List[Entity]:
"""
Retrieves information about resultsets that have the given attributes.
Parameters
----------
attributes: Dict[str, Any], optional
The filter specification. It may have the following
optional fields:
search_key : str
Filter across fields like dataset id, and dataset name.
Returns
-------
List[Entity]
A list of Entity objects representing resultsets.
"""
return self.resultsets.get_entities(attributes) # type: ignore
[docs] def get_resultset_by_name(self, name: str) -> Optional[Entity]:
"""
Retrieves a resultset with the given name.
Parameters
----------
name : str
The name of the resultset to retrieve.
Returns
-------
Entity
The Entity object representing the resultset.
"""
return self.resultsets.get_entity_by_name(name)
[docs] def get_resultset_samples(
self, resultset: Resultset, max_sample_size: int = 10000
) -> SampleInfoList:
"""
Retrieves the samples of a resultset
Parameters
----------
resultset : Resultset
The Resultset object to get samples for.
Returns
-------
SampleInfoList
A SampleInfoList object.
"""
return self.resultsets.get_samples(resultset, max_sample_size)
[docs] def create_resultset(self, spec: Dict[str, Any]) -> Entity:
"""
Creates a new resultset entity.
Parameters
----------
spec : Dict[str, Any]
The resultset spec. The spec should have the following fields:
job: Job
The associated job object.
name : str
The name of the new resultset.
samples: SampleInfoList
The samples to be included in this resultset.
Returns
-------
Entity
The created entity
"""
return self.resultsets.create_entity(spec) # type: ignore
[docs] def update_resultset(
self,
resultset: Resultset,
add_list: Optional[SampleInfoList] = None,
del_list: Optional[SampleInfoList] = None,
) -> bool:
"""
Updates a resultset.
Parameters
----------
resultset: Resultset
The resultset to be updated.
add_list: SampleInfoList, optional
The list of samples to be added.
del_list: SampleInfoList, optional
The list of samples to be deleted.
Returns
-------
bool
Indicates whether the operation was successful.
"""
return self.resultsets.update_resultset(resultset, add_list, del_list)
[docs] def delete_resultset(self, resultset: Resultset) -> bool:
"""
Deletes a resultset object.
Parameters
----------
resultset : Resultset
The resultset object to delete.
Returns
-------
bool
Indicates whether the operation was successful.
"""
return self.resultsets.delete_entity(resultset) # type: ignore
[docs] def publish_resultset(self, resultset: Resultset) -> bool:
"""
Publishes a resultset.
Parameters
----------
resultset: Resultset
The resultset to be published.
Returns
-------
bool
Indicates whether the operation was successful.
"""
return self.resultsets.publish(resultset=resultset)
#
# Job API
#
[docs] def create_job_spec(
self,
dataset: Dataset,
job_type: Union[str, JobType] = JobType.EXPLORE,
job_name: str = "",
predictions_file: str = "",
cluster_algo: Union[str, ClusterAlgoType] = ClusterAlgoType.HDBSCAN,
embed_algo: Union[str, EmbedAlgoType] = EmbedAlgoType.UMAP,
num_clusters: Optional[int] = None,
max_images: int = 1000,
catalog_table: Optional[CatalogTable] = None,
analyze_params: Optional[AnalyzeJobParams] = None,
pipeline: Optional[Pipeline] = None,
filters: List[Condition] = None, # type: ignore
reference_job: Job = None,
) -> JobSpec:
"""
Creates a JobSpec object that specifies how a job is to be created.
Parameters:
-----------
dataset: Dataset
The dataset to explore.
job_type : JobType, optional
The job type
job_name : str, optional
The name of the job to create. A unique name will be generated
if this is not given.
predictions_file: str, optional
The path to the catalog file containing predictions and ground
truth. This file must be formatted according to the specification
at:
https://docs.akridata.ai/docs/analyze-job-creation-and-visualization
cluster_algo : ClusterAlgoType, optional
The clustering algorithm to use.
embed_algo : EmbedAlgoType, optional
The embedding algorithm to use.
num_clusters : int, optional
The number of clusters to create.
max_images : int, optional
The maximum number of images to use.
catalog_table: CatalogTable, optional
The catalog to be used for creating this explore job. This defaults
to the internal primary catalog that is created automatically when
a dataset is created.
default: "primary"
analyze_params: AnalyzeJobParams, optional
Analyze job related configuration parameters
filters : List[Condition], optional
The filters to be used to select a subset of samples for this job.
These filters are applied to the catalog specified by catalog_name.
reference_job: Job, optional
The reference job for this compare job
"""
if pipeline is None:
pipelines: List[Pipeline] = self.get_attached_pipelines(
dataset=dataset
)
pipeline = self.datasets.get_default_pipeline(
attached_pipelines=pipelines,
data_type=get_dataset_type(dataset.info.data_type),
)
if catalog_table is None:
catalog_table = CatalogTable(table_name="primary")
is_compare = False
if job_type == JobType.COMPARE:
is_compare = True
job_type = JobType.EXPLORE
return JobSpec(
dataset=dataset,
job_type=job_type,
job_name=job_name,
predictions_file=predictions_file,
cluster_algo=cluster_algo,
embed_algo=embed_algo,
num_clusters=num_clusters,
max_images=max_images,
catalog_table=catalog_table,
filters=filters,
pipeline=pipeline,
analyze_params=analyze_params,
is_compare=is_compare,
reference_job=reference_job,
)
[docs] def create_job(self, spec: JobSpec) -> Job:
"""
Creates an explore job for the specified dataset.
Parameters:
-----------
dataset: Dataset
The dataset to explore.
spec: JobSpec
The job specification.
Returns:
--------
Job
The newly created Job object.
"""
return self.jobs.create_entity(spec) # type: ignore
[docs] def delete_job(self, job: Job) -> bool:
"""
Deletes a job object.
Parameters
----------
job : Job
The job object to delete.
Returns
-------
bool
Indicates whether the operation was successful.
"""
raise NotImplementedError
[docs] def get_job_by_name(self, name: str) -> Job:
"""
Retrieves a job with the given name.
Parameters
----------
name : str
The name of the job to retrieve.
Returns
-------
Entity
The Entity object representing the job.
"""
return self.jobs.get_entity_by_name(name) # type: ignore
[docs] def get_jobs(self, attributes: Dict[str, Any] = {}) -> List[Entity]:
"""
Retrieves information about jobs that have the given attributes.
Parameters
----------
attributes: Dict[str, Any]
The filter specification. It may have the following
optional fields:
data_type : str
The data type to filter on. This can be 'IMAGE' or 'VIDEO'.
job_type : str
The job type to filter on - 'EXPLORE', 'ANALYZE' etc.
search_key : str
Filter jobs across fields like job name, dataset id, and
dataset name.
Returns
-------
List[Entity]
A list of Entity objects representing jobs.
"""
return self.jobs.get_entities(attributes) # type: ignore
[docs] def get_compatible_reference_jobs(
self,
dataset: Dataset,
pipeline: Pipeline,
catalog_table: CatalogTable,
search_key: str = None,
) -> List[Job]:
"""
Retrieves jobs created from a given catalog_table which can be used to
create “JobType.COMPARE” job types
Parameters
----------
dataset: Dataset
The dataset to explore.
pipeline: Pipeline
The pipeline to use.
catalog_table:
The catalog table to use for creating compare job.
search_key: str
Filter jobs across fields like job name
Returns
-------
List[Entity]
A list of Entity objects representing jobs.
"""
return self.jobs.get_compatible_reference_jobs(
dataset, pipeline, catalog_table, search_key
)
[docs] def get_thumbnail_images(
self, samples: SampleInfoList
) -> List[Image.Image]:
"""
Retrieves the thumbnail images corresponding to the samples.
Parameters
----------
samples : SampleInfoList
The samples to retrieve thumbnails for.
Returns
-------
List[Image.Image]
A list of thumbnail images.
"""
if samples.job_id:
return self.jobs.get_thumbnail_images(samples)
return self.resultsets.get_thumbnail_images(samples)
[docs] def get_fullres_images(self, samples: SampleInfoList) -> List[Image.Image]:
"""
Retrieves the full-resolution images for the provided job.
Parameters
----------
samples : SampleInfoList
The samples to retrieve images for.
Returns
-------
List[Image.Image]
A list of images.
"""
return self.jobs.get_fullres_images(samples)
[docs] def get_fullres_image_urls(self, samples: SampleInfoList) -> Dict:
"""
Retrieves the full-resolution image urls for the give samples.
Parameters
----------
samples : SampleInfoList
The samples to retrieve full res image urls for.
Returns
-------
Dict
A dictionary containing the full-resolution image URLs for
each sample.
"""
if not samples:
raise ValueError("'samples' cannot be None")
if not isinstance(samples, SampleInfoList):
raise TypeError(
f"Invalid argument type: {type(samples)}."
f"Expected type: SampleInfoList".format(type(samples))
)
return self.jobs.get_fullres_image_urls(samples)
[docs] def get_job_statistics(
self, job: Job, context: JobStatisticsContext, **kwargs
) -> JobStatistics:
"""
Retrieves statistics info from an analyze job.
Parameters
----------
job : Job
The Job object to get statistics for.
context: JobStatisticsContext
The type of statistics to retrieve.
**kwargs: Additional keyword arguments
Supported keyword arguments
iou_config_threshold: float, optional
Threshold value for iou config
confidence_score_threshold: float, optional
Threshold value for confidence score
Returns
-------
JobStatistics
A job statistics object.
"""
return self.jobs.get_job_statistics(job, context, **kwargs)
[docs] def get_job_samples(
self,
job: Job,
job_context: JobContext,
spec: Union[
SimilaritySearchSpec,
ConfusionMatrixCellSpec,
ClusterRetrievalSpec,
CoresetSamplingSpec,
],
**kwargs,
) -> SampleInfoList:
"""
Retrieves the samples according to the given specification.
Parameters
----------
job : Job
The Job object to get samples for.
job_context: JobContext
The context in which the samples are requested for.
spec: Union[
SimilaritySearchSpec,
ConfusionMatrixCellSpec,
ClusterRetrievalSpec,
CoresetSamplingSpec
]
The job context spec.
**kwargs: Additional keyword arguments
Supported keyword arguments
iou_config_threshold: float, optional
Threshold value for iou config
confidence_score_threshold: float, optional
Threshold value for confidence score
Returns
-------
SampleInfoList
A SampleInfoList object.
"""
return self.jobs.get_samples(job, job_context, spec, **kwargs)
[docs] def get_job_samples_from_file_path(
self,
job: Job,
file_info: List[str],
) -> Dict:
"""
Retrieves the samples according to the given specification.
Parameters
----------
job : Job
The Job object to get samples for.
The job context spec.
file_info: List[str]
List of file_paths for the images of interest
Returns
-------
Dict
dictionary of map between file_path and point_ids
"""
return self.jobs.get_samples_from_file_path(job, file_info)
[docs] def get_job_display_panel(
self,
job: Job,
) -> str:
"""
Retrieves the job panel URI the Data Explorer.
Parameters
----------
job : Job
The Job object to be queried.
Returns
-------
str
The job panel URL.
"""
return (
f"https://{self.host}/"
f"{self.jobs.get_job_display_panel_uri(job)}"
)
#
# Common API
#
[docs] def get_progress_info(self, task: BackgroundTask) -> ProgressInfo:
"""
Gets the progress of the specified task.
Parameters
----------
task : BackgroundTask
The task object to retrieve the progress information for.
Returns
-------
ProgressInfo
The progress information
"""
return task.get_progress_info()
[docs] def wait_for_completion(self, task: BackgroundTask) -> ProgressInfo:
"""
Waits for the specified task to complete.
Parameters
----------
task : BackgroundTask
The ID of the job to wait for.
Returns
-------
ProgressInfo
The progress information
"""
return task.wait_for_completion()
[docs] def create_view(
self,
view_name: str,
description: Optional[str],
dataset: Dataset,
left_table: CatalogTable,
right_table: CatalogTable,
join_condition: JoinCondition,
inner_join: bool = False,
) -> str: # -> Any | None:# -> Any | None:
"""Create a SQL view for visualization
Note: Left join is used by default while creating the view
Args:
view_name (str): Name of the view to create
description (Optional[str]): Description text
dataset (Dataset): Dataset object
left_table (TableInfo): Left Table of the create view query
right_table (TableInfo): Right Table of the create view query
join_condition (JoinCondition): JoinCondition which includes the
column from the left and the right table
inner_join (bool): Use inner join for joining the tables
Returns:
str: view id
"""
return self.catalogs.create_view(
view_name,
description,
dataset,
left_table,
right_table,
join_condition,
inner_join,
)
[docs] def get_attached_pipelines(
self, dataset: Dataset, version: Optional[str] = None
) -> List[Pipeline]:
"""Get pipelines attached for dataset given a dataset version
Args:
dataset (Dataset): Dataset object
version (str, optional): Dataset version. Defaults to None in which
case the latest version would be used
Returns:
List[Pipeline]: List of pipelines attached with the dataset
"""
return self.datasets.get_attached_pipelines(dataset, version)
[docs] def get_all_columns(
self, dataset: Dataset, table: CatalogTable
) -> List[Column]:
"""Returns all columns for a table/view
Args:
dataset (Dataset): Dataset object
table (TableInfo): Table Information
Returns:
List[Column]: List of columns of the table
"""
return self.catalogs.get_all_columns(dataset, table)
[docs] def get_secrets(self, name: str, namespace: str) -> Optional[SMSSecrets]:
"""
Retrieves information about SMS Secret for
the given SMS secret name and namespace.
Parameters
----------
name: str
Filter across SMS Secret Key
namespace: str
Filter across SMS Secret Namespace
Returns
-------
SMSSecrets
Object representing Secrets.
"""
attributes = {"sms_key": name, "sms_namespace": namespace}
secrets = self.sms_secrets.get_entities(attributes)
if len(secrets):
return secrets[0]
return None
[docs] def get_repository_by_name(self, name: str) -> Optional[Entity]:
"""
Retrieves a Docker repository with the given name.
Parameters
----------
name : str
The name of the Repository to retrieve.
Returns
-------
Entity
Object representing the Docker Repository.
"""
return self.repository.get_entity_by_name(name)
[docs] def get_docker_image(self, name: str) -> Optional[Entity]:
"""
Retrieves a Docker Image with the given name.
Parameters
---------
name : str
The name of the Docker Image to retrieve
Returns
------
Entity
The Entity object representing the Docker Image.
"""
return self.docker_images.get_entity_by_name(name)
[docs] def create_featurizer_image_spec(
self,
image_name: str,
description: str,
command: str,
repository_name: str,
properties: Dict[str, Any],
gpu_filter: Optional[bool] = None,
gpu_mem_fraction: Optional[float] = None,
allow_no_gpu: Optional[bool] = None,
namespace: Optional[str] = "default",
image_tag: Optional[str] = "latest",
name: Optional[str] = None,
) -> DockerImageSpec:
"""
Creates a DockerImageSpec object that specifies the
Featurizer Docker Image to be created
Parameters:
-----------
image_name : str
The name of the Docker Image present in the repository
description : str
A short description of the Docker Image
command: str
Command that is used to run the featurizer docker
repository_name: str
Name of the repository in DE, the Docker Image will be pulled from.
properties: Dict[str, Any]
Properties specific to the Docker Image
gpu_filter: Optional[bool]
Flag to specify if the Image can be on a GPU or not
gpu_mem_fraction: Optional[float]
The GPU specifying the memory to be reserved for the Docker Image.
Should be > 0 and <= 1
allow_no_gpu: Optional[bool]
Flag to specify if the Image can also be run if no GPU is available
namespace: Optional[str]
Namespace of the Docker Image, By default it
will be 'default'
image_tag: Optional[str]
Tag of the docker Image in the docker repository,
be default it will be "latest"
name: Optional[str]
Display name of the Docker Image on DE,
by default it will be same as image_name
Returns
------
DockerImageSpec
Object representing a Docker Image Specification
"""
resolved_repository = self.get_repository_by_name(name=repository_name)
if not resolved_repository:
raise ValueError("Repository does not exist")
repository_id = resolved_repository.id
return DockerImageSpec(
name=name,
namespace=namespace,
repository_id=repository_id,
description=description,
image_name=image_name,
image_tag=image_tag,
command=command,
properties=properties,
gpu_filter=gpu_filter,
gpu_mem_fraction=gpu_mem_fraction,
allow_no_gpu=allow_no_gpu,
)
[docs] def register_docker_image(
self, spec: DockerImageSpec
) -> Optional[DockerImage]:
"""
Registers a Docker Image
Parameters
---------
spec : DockerImageSpec
Docker Image Specification
Returns
------
DockerImage
Object representing the Docker Image
"""
return self.docker_images.create_entity(spec) # type: ignore
[docs] def create_featurizer_pipeline_spec(
self,
pipeline_name: str,
pipeline_description: str,
featurizer_name: str,
data_type: Optional[str] = DataType.IMAGE,
namespace: Optional[str] = "default",
) -> DockerPipelineSpec:
"""
Creates a DockerImageSpec object that specifies
the Featurizer Docker Image to be created
Parameters:
-----------
pipeline_name : str
The name of the Docker pipeline
pipeline_description : str
A short description of the Docker Pipeline
featurizer_name: str
Docker Image name of the featurizer to uniquely identify the image.
data_type: Optional[str]
Data Type of the pipeline, by default DataType.IMAGE.
Allowed values are DataType.IMAGE, DataType.VIDEO
namespace: Optional[str]
Namespace of the Docker Pipeline, By default it will be 'default'
Returns
------
DockerPipelineSpec
Object representing a Docker Pipeline Specification
"""
featurizer_docker_resp = self.get_docker_image(name=featurizer_name)
if not featurizer_docker_resp:
raise ValueError(f"Featurizer - {featurizer_name} does not exist")
pre_processor_docker_resp = self.get_docker_image(
name=AkridataDockerNames.AKRIDATA_IMAGE_PREPROCESSOR
)
if not pre_processor_docker_resp:
raise ValueError("Error Fetching Preprocessor Image")
thumbnail_docker_resp = self.get_docker_image(
name=AkridataDockerNames.AKRIDATA_THUMBNAIL_GENERATOR
)
if not thumbnail_docker_resp:
raise ValueError("Error Fetching Thumbnail Image")
featurizer_pipeline_docker = PipelineDocker(
id=featurizer_docker_resp.id
)
preprocessor_pipeline_docker = PipelineDocker(
id=pre_processor_docker_resp.id
)
thumbnail_pipeline_docker = PipelineDocker(id=thumbnail_docker_resp.id)
return DockerPipelineSpec(
pipeline_name=pipeline_name,
pipeline_description=pipeline_description,
featurizer_docker=featurizer_pipeline_docker,
pre_processor_docker=preprocessor_pipeline_docker,
thumbnail_docker=thumbnail_pipeline_docker,
data_type=data_type,
namespace=namespace,
)
[docs] def create_docker_pipeline(
self, spec: DockerPipelineSpec
) -> Optional[DockerPipeline]:
"""
Creates a Pipeline using the Docker Image
Parameters
---------
spec : DockerPipelineSpec
Pipeline Specification
Returns
------
DockerPipeline
object representing the Docker Pipeline
"""
return self.docker_pipelines.create_entity(spec) # type: ignore
[docs] def attach_pipeline_to_dataset(
self,
pipeline_id,
dataset_id,
attachment_policy_type: Optional[
AttachmentPolicyType
] = AttachmentPolicyType.ON_DEMAND,
):
"""
Attach pipeline based on a
Parameters
----------
dataset_id : str
The dataset id representing a dataset
pipeline_id: str
The pipeline id representing a docker pipeline
attachment_policy_type: Optional[AttachmentPolicyType]
Pipeline attachment policy type, by default "ON_DEMAND"
Returns
-------
None
"""
self.datasets.attach_pipeline(
dataset_id=dataset_id,
pipeline_id=pipeline_id,
attachment_policy_type=attachment_policy_type,
)