logml.analysis.main
Functions
|
Run one step_name. |
|
Performs a loop until all dag steps are invoked: |
|
Create mp.Process object to invoke a step in the other process. |
|
Summarize DAG state: execution, failures stats, etc. |
|
Load dataframe and check its size. |
|
Entry point for complete dag execution. |
|
Executes steps parallel |
|
Executes steps sequentially in the single process. |
|
Entry point for steps execution. |
|
Generates dag, new config and scheduler file. |
|
Fetches step which are waiting to start, and have all dependencies complete successfully. |
|
Create process states for valid DAG steps. |
|
Parses a given set of input arguments and returns arguments for runner. |
|
Create runner and dag, load previous DAG states. |
Classes
|
Contains state of the process which executes the job. |
- class logml.analysis.main.JobProcessState(item_cfg: logml.analysis.config.PipelineItemConfig, result: logml.analysis.common.JobResult, process: multiprocessing.context.Process)
Bases:
object
Contains state of the process which executes the job.
- result: logml.analysis.common.JobResult
- process: multiprocessing.context.Process
- start()
- set_status(status, filename, error=None)
- logml.analysis.main.create_mp_step_process(global_params, step_name) multiprocessing.context.Process
Create mp.Process object to invoke a step in the other process.
- logml.analysis.main.get_ready_to_start_steps(process_states: Dict[str, logml.analysis.main.JobProcessState]) set
Fetches step which are waiting to start, and have all dependencies complete successfully.
- logml.analysis.main.execute_analysis_dag_parallel(cfg, global_params, n_jobs=1, logger=None, total_timeout=86400, check_timeout=10, mp_debug=False, **kwargs) int
Executes steps parallel
- logml.analysis.main.init_process_states(folders, global_params, logger, results, runner)
Create process states for valid DAG steps.
- logml.analysis.main.execute_analysis_dag_sequential(cfg, global_params, logger=None, errors='report', **kwargs) int
Executes steps sequentially in the single process.
- logml.analysis.main.prepare_runner(cfg, folders, global_params, reset_dag=None, logger=None)
Create runner and dag, load previous DAG states.
- logml.analysis.main.execute_analysis_dag(config_path=None, errors='report', **kwargs) int
Entry point for complete dag execution.
DAG steps are executed in sequential manner, in the topological order, which guarantees completion of dependencies before a next step start. All failed steps are automatically propagate their status downstream, setting it to ‘dependency failed’.
- Returns
0 if all steps succeeded, -1 if any step failed.
- logml.analysis.main.execute_analysis_dag_step(config_path=None, item_names=None, job_completion_file=None, **kwargs) int
Entry point for steps execution.
- Returns
0 if all steps succeeded, -1 if any step failed.
- logml.analysis.main.estimate_data_size(cfg, global_params) Tuple[pandas.core.frame.DataFrame, Dict[str, tuple]]
Load dataframe and check its size.
- logml.analysis.main.generate_analysis_dag(dump_dag_files=True, make_image=False, **kwargs) Tuple[GlobalConfig, logml.analysis.common.JobsSchedule]
Generates dag, new config and scheduler file.
- logml.analysis.main.describe_dag_state(run_name: Optional[str] = None, output_path: Optional[str] = None)
Summarize DAG state: execution, failures stats, etc.
- logml.analysis.main.parse_arguments(global_params: dict)
Parses a given set of input arguments and returns arguments for runner.