calypso.dispatchers.orchestrator package

class calypso.dispatchers.orchestrator.Orchestrator(machinelist: list[dict | Machine], timeinterval: int = 10, maxjobnumb: int = 10000, pickup: bool = False, tmppath: str = 'BackStage', **kwargs)

Bases: object

Dispatch jobs to given machines, each machine has a max number to limit the number of running jobs.

Examples

>>> with Orchestrator(
        machine_list=[asdict(self.machine_data)],
        time_internal=1,
        max_job_numb=2,
        pick_up=False,
    ) as orchestrator:
        jobs = [Job(...), ...]
        orchestrator.orchestrate(jobs, nowait=True)
are_jobs_finished()

print the job state and return True if all finished else False.

as_dict()

Return the current job state as a dictionary.

get_results(numb_of_results)
get_results_done(optcontexts)
graceful_monitoring(event: Event)
handle_unexpected_state()

Handle jobs that are in an unexpected state.

how_jobs_going(log=True)

print the job state.

monitoring(event: Event)
orchestrate(jobs: list[Job] | Job | None = None, nowait=True)

dispatch one job or a bunch of jobs into queue and submit jobgroups into free machine, in which jobgroups is a list of jobs grouped by FreeMachine.groupsize.

Parameters:

jobs (list of Job or A Job.) – Job(s) to be dispatched.

pickup_from_old_submit() None

Need to pick up the job state and wait queue if detecting pick_up is True.

serialize(to_file=True)

saving job_state into local file

start()
start_monitoring()

start threads for worker and consumer threads.

Returns:

thread_object

Return type:

thread object

stop()
terminate_monitoring()

Subpackages

Submodules