calypso.dispatchers.orchestrator.orchestrator module
- class calypso.dispatchers.orchestrator.orchestrator.Orchestrator(machinelist: list[dict | Machine], timeinterval: int = 10, maxjobnumb: int = 10000, pickup: bool = False, tmppath: str = 'BackStage', **kwargs)
Bases:
object
Dispatch jobs to given machines, each machine has a max number to limit the number of running jobs.
Examples
>>> with Orchestrator( machine_list=[asdict(self.machine_data)], time_internal=1, max_job_numb=2, pick_up=False, ) as orchestrator: jobs = [Job(...), ...] orchestrator.orchestrate(jobs, nowait=True)
- are_jobs_finished()
print the job state and return True if all finished else False.
- as_dict()
Return the current job state as a dictionary.
- get_results(numb_of_results)
- get_results_done(optcontexts)
- handle_unexpected_state()
Handle jobs that are in an unexpected state.
- how_jobs_going(log=True)
print the job state.
- orchestrate(jobs: list[Job] | Job | None = None, nowait=True)
dispatch one job or a bunch of jobs into queue and submit jobgroups into free machine, in which jobgroups is a list of jobs grouped by FreeMachine.groupsize.
- pickup_from_old_submit() None
Need to pick up the job state and wait queue if detecting pick_up is True.
- serialize(to_file=True)
saving job_state into local file
- start()
- start_monitoring()
start threads for worker and consumer threads.
- Returns:
thread_object
- Return type:
thread object
- stop()
- terminate_monitoring()