logml.analysis.dag

Classes

DagResult(dag[, jobs_results])

Result of the dag execution.

PipelineDag(steps)

Direct Acyclic Graph for Logml steps.

class logml.analysis.dag.PipelineDag(steps: List[logml.analysis.config.PipelineItemConfig])

Bases: object

Direct Acyclic Graph for Logml steps.

classmethod from_config(cfg: GlobalConfig, analysis_only=True) PipelineDag

Generate new dag from config.

get_sorted_steps_ids() List[str]

Return steps ids in tolological order suitable for correct sequential execution or hunam-readability.

create_jobs_schedule(cfg: GlobalConfig = None, df: Optional[pd.DataFrame] = None, strata_shapes: Optional[Dict[str, tuple]] = None) JobsSchedule

Generate jobs from pipeline steps

get_dependent_jobs(start_node) List[str]

Return jobs depending on current node (BFS).

COLORMAP = {'combine_fi': 'brown', JobStatus.DEPENDENCY_FAILED: 'brown', 'extract_fi': 'brown', JobStatus.FAILED: 'red', 'generate_report': 'cyan', 'modeling_data_transform': 'green', JobStatus.OK: 'green', JobStatus.RUNNING: 'black', 'select_models': 'blue', 'survival_analysis': 'magenta', 'train_model': 'blue', JobStatus.WAIT_START: 'gray'}
make_image(filename=None, figsize=(22, 22), ax=None, dag_result: Optional[logml.analysis.dag.DagResult] = None) None

Generate graph image

get_all_dependencies(start_node) List[str]

Get all steps that node depends on (BFS)

to_dataframe(exclude: Optional[Union[Set[Any], Dict[str, Any]]] = None) pandas.core.frame.DataFrame

Returns table form of the dag.

class logml.analysis.dag.DagResult(dag: logml.analysis.dag.PipelineDag, jobs_results: Optional[Dict[str, logml.analysis.common.JobResult]] = None)

Bases: object

Result of the dag execution.

dag: logml.analysis.dag.PipelineDag
jobs_results: Dict[str, logml.analysis.common.JobResult] = None
remove_steps(folders: logml.analysis.common.DagOutputStructure) int

Removes steps state files.

load_steps(folders: logml.analysis.common.DagOutputStructure, reset: bool = False, debug: bool = False) int

Load steps execution data.

Parameters
  • folders – DagOutputStructure with path to look JobResult dump files.

  • reset

    When True, DAG execution state is reset to be consistent with new execution:

    • only success jobs are left in the list.

    • if there are tasks in progress, exception is raised, due to impossibility of

      setting consistent state. You should manually check running jobs in this case.

    When False, this method is used for reporting purpose, and does not adjust anything.

  • debug – When True, running jobs are ignored. Do not use in production.

Returns: number of jobs results loaded.

try_load_step_result(filename: str)
reset_state(folders: logml.analysis.common.DagOutputStructure, debug: bool = False)

Reset dag state to consistent one.

propagate_status(name: str, status: logml.analysis.common.JobStatus)

Set status for all dependent jobs

has_failed_dependecies(name: str) bool

Any steps this step depends on.

get_results_as_dataframe() Optional[pandas.core.frame.DataFrame]

Returns results as dataframe

generate_report() dict

Statistics of dag performance.