calypso.dispatchers.orchestrator package
- class calypso.dispatchers.orchestrator.Orchestrator(machinelist: list[dict | Machine], timeinterval: int = 10, maxjobnumb: int = 10000, pickup: bool = False, tmppath: str = 'BackStage', **kwargs)
Bases:
object
Dispatch jobs to given machines, each machine has a max number to limit the number of running jobs.
Examples
>>> with Orchestrator( machine_list=[asdict(self.machine_data)], time_internal=1, max_job_numb=2, pick_up=False, ) as orchestrator: jobs = [Job(...), ...] orchestrator.orchestrate(jobs, nowait=True)
- are_jobs_finished()
print the job state and return True if all finished else False.
- as_dict()
Return the current job state as a dictionary.
- get_results(numb_of_results)
- get_results_done(optcontexts)
- handle_unexpected_state()
Handle jobs that are in an unexpected state.
- how_jobs_going(log=True)
print the job state.
- orchestrate(jobs: list[Job] | Job | None = None, nowait=True)
dispatch one job or a bunch of jobs into queue and submit jobgroups into free machine, in which jobgroups is a list of jobs grouped by FreeMachine.groupsize.
- pickup_from_old_submit() None
Need to pick up the job state and wait queue if detecting pick_up is True.
- serialize(to_file=True)
saving job_state into local file
- start()
- start_monitoring()
start threads for worker and consumer threads.
- Returns:
thread_object
- Return type:
thread object
- stop()
- terminate_monitoring()
Subpackages
- calypso.dispatchers.orchestrator.executor package
- calypso.dispatchers.orchestrator.scheduler package
get_scheduler()
- Submodules
Submodules
- calypso.dispatchers.orchestrator.jobs module
Job
Job.init_job_status()
Job.command
Job.comment
Job.download_files
Job.job_id
Job.local_root
Job.local_work_dir
Job.machine_id
Job.machine_name
Job.max_retry
Job.name
Job.remote_log_file
Job.remote_root
Job.remote_running_script
Job.remote_work_dir
Job.retry
Job.status
Job.submit_batch
Job.upload_files
Job.upload_share_files
JobStatus
- calypso.dispatchers.orchestrator.log module
- calypso.dispatchers.orchestrator.machine module
Machine
MachineData
MachineData.from_kwargs()
MachineData.as_dict()
MachineData.additional_head_setting
MachineData.command
MachineData.envs
MachineData.executor
MachineData.external_command
MachineData.group_size
MachineData.host
MachineData.key_filename
MachineData.kwargs
MachineData.machine_capacity
MachineData.max_retry
MachineData.max_run_time
MachineData.name
MachineData.numb_cpu_per_node
MachineData.numb_node
MachineData.password
MachineData.port
MachineData.queue
MachineData.remote_root
MachineData.scheduler
MachineData.scheduler_env
MachineData.username
- calypso.dispatchers.orchestrator.orchestrator module
Orchestrator
Orchestrator.are_jobs_finished()
Orchestrator.as_dict()
Orchestrator.get_results()
Orchestrator.get_results_done()
Orchestrator.graceful_monitoring()
Orchestrator.handle_unexpected_state()
Orchestrator.how_jobs_going()
Orchestrator.monitoring()
Orchestrator.orchestrate()
Orchestrator.pickup_from_old_submit()
Orchestrator.serialize()
Orchestrator.start()
Orchestrator.start_monitoring()
Orchestrator.stop()
Orchestrator.terminate_monitoring()
- calypso.dispatchers.orchestrator.toolkit module