logml.analysis.main

Functions

_execute_step([filename, results, runner, ...])

Run one step_name.

_parallel_steps_execution_loop(n_jobs, ...)

Performs a loop until all dag steps are invoked:

create_mp_step_process(global_params, step_name)

Create mp.Process object to invoke a step in the other process.

describe_dag_state([run_name, output_path])

Summarize DAG state: execution, failures stats, etc.

estimate_data_size(cfg, global_params)

Load dataframe and check its size.

execute_analysis_dag([config_path, errors])

Entry point for complete dag execution.

execute_analysis_dag_parallel(cfg, global_params)

Executes steps parallel

execute_analysis_dag_sequential(cfg, ...[, ...])

Executes steps sequentially in the single process.

execute_analysis_dag_step([config_path, ...])

Entry point for steps execution.

generate_analysis_dag([dump_dag_files, ...])

Generates dag, new config and scheduler file.

get_ready_to_start_steps(process_states)

Fetches step which are waiting to start, and have all dependencies complete successfully.

init_process_states(folders, global_params, ...)

Create process states for valid DAG steps.

parse_arguments(global_params)

Parses a given set of input arguments and returns arguments for runner.

prepare_runner(cfg, folders, global_params)

Create runner and dag, load previous DAG states.

Classes

JobProcessState(item_cfg, result, process)

Contains state of the process which executes the job.

class logml.analysis.main.JobProcessState(item_cfg: logml.analysis.config.PipelineItemConfig, result: logml.analysis.common.JobResult, process: multiprocessing.context.Process)

Bases: object

Contains state of the process which executes the job.

item_cfg: logml.analysis.config.PipelineItemConfig
result: logml.analysis.common.JobResult
process: multiprocessing.context.Process
start()
set_status(status, filename, error=None)
logml.analysis.main.create_mp_step_process(global_params, step_name) multiprocessing.context.Process

Create mp.Process object to invoke a step in the other process.

logml.analysis.main.get_ready_to_start_steps(process_states: Dict[str, logml.analysis.main.JobProcessState]) set

Fetches step which are waiting to start, and have all dependencies complete successfully.

logml.analysis.main.execute_analysis_dag_parallel(cfg, global_params, n_jobs=1, logger=None, total_timeout=86400, check_timeout=10, mp_debug=False, **kwargs) int

Executes steps parallel

logml.analysis.main.init_process_states(folders, global_params, logger, results, runner)

Create process states for valid DAG steps.

logml.analysis.main.execute_analysis_dag_sequential(cfg, global_params, logger=None, errors='report', **kwargs) int

Executes steps sequentially in the single process.

logml.analysis.main.prepare_runner(cfg, folders, global_params, reset_dag=None, logger=None)

Create runner and dag, load previous DAG states.

logml.analysis.main.execute_analysis_dag(config_path=None, errors='report', **kwargs) int

Entry point for complete dag execution.

DAG steps are executed in sequential manner, in the topological order, which guarantees completion of dependencies before a next step start. All failed steps are automatically propagate their status downstream, setting it to ‘dependency failed’.

Returns

0 if all steps succeeded, -1 if any step failed.

logml.analysis.main.execute_analysis_dag_step(config_path=None, item_names=None, job_completion_file=None, **kwargs) int

Entry point for steps execution.

Returns

0 if all steps succeeded, -1 if any step failed.

logml.analysis.main.estimate_data_size(cfg, global_params) Tuple[pandas.core.frame.DataFrame, Dict[str, tuple]]

Load dataframe and check its size.

logml.analysis.main.generate_analysis_dag(dump_dag_files=True, make_image=False, **kwargs) Tuple[GlobalConfig, logml.analysis.common.JobsSchedule]

Generates dag, new config and scheduler file.

logml.analysis.main.describe_dag_state(run_name: Optional[str] = None, output_path: Optional[str] = None)

Summarize DAG state: execution, failures stats, etc.

logml.analysis.main.parse_arguments(global_params: dict)

Parses a given set of input arguments and returns arguments for runner.