Important Dask schedulers are experimental in Guild. The functionality outlined in this document is available as a pre-release in Guild 0.7.3.rc2 and later. To install the latest pre-release, run
pip install --upgrade --pre guildai,
Guild 0.7.3 introduces support for running operations in parallel using Dask.distributed. Dask based scheduling is implemented by the built-in
dask:scheduler operation. A Dask scheduler is similar to a Guild queues. Unlike a queue, a Dask scheduler is capable of running operations concurrently based on a specified number of workers.
To start a Dask scheduler, run:
guild run dask:scheduler
This starts a scheduler, which waits for staged runs. When the scheduler sees a staged run, it starts the run provided it meets the run resource requirements. See Scheduler Resources below for more information.
The scheduler appears like any other run in Guild—e.g. you can show the scheduler by running
guild runs. To stop a scheduler, press Ctrl-c in the scheduler terminal or use the
stop command from another terminal.
guild stop -Fo dask:scheduler
dask:scheduler supports a number of flags, which are used to configure the scheduler.
|workers||Number of workers in the Dask cluster|
|resources||Resources provided by the scheduler|
|dashboard-address||Address to listen to for dashboard connections (default is 8787)|
|loglevel||Log level used for Dask scheduler (default is warn)|
|poll-interval||Minimum number of seconds between polls (default is 10)|
|run-once||Run all staged runs and stop (default is no)|
|wait-for-running||Wait for other runs to stop before starting staged runs (default is no)|
--help-op to view help for the operation.
guild run dask:scheduler --help-op
To use the Dask scheduler, install these Python packages:
bokeh(optional support for Dask dashboard—see below for details)
To install the minimal requirements, run:
pip install dask[distributed]
To install all requirements, including support for the Dask dashboard, run:
pip install dask[distributed] bokeh
To illustrate, we use a demo script that waits for a period of time and exits. We use a Dask scheduler to run multiple operations concurrently.
Create Demo Project
Create a new directory. This is your demo project.
Next, create a file named
demo.py in the project directory that contains this code:
import time, os sleep = 5.0 run_id = os.getenv("RUN_ID") print("Run %s sleeping for %f seconds" % (run_id, sleep)) time.sleep(sleep) print("Run %s done" % run_id)
demo.py located in project directory
Stage 10 runs using randomly chosen sleep periods from 1 to 10 seconds.
guild run demo.py sleep=[1:10] --trials 10 --stage-trials
This command stages 10
demo.py operations. You can view the staged runs by running:
guild runs --staged
Start a scheduler with the
run-once flag. This causes the scheduler to exit after all staged runs are completed.
guild run dask:scheduler run-once=yes workers=10
This commands uses a Dask scheduler to run the staged runs. The scheduler uses 10 workers, which means it can run up to 10 operations concurrently. Note that the runs all start at the same time and finish according to their configured sleep period.
You can start a Dask scheduler with one or more resources. A resource is a named quantity that is available to runs that require the resource. When a run requires a resource, it specifies the required quantity. If that quantity is not available to the scheduler, the run waits until the quantity is available. When started, the run temporarily depletes its required quantity from the scheduler. When the run finishes, it replenishes the depleted quantity. In this way, resources limit what is run by a scheduler to avoid resource exhaustion.
Resources can be used to constrain runs by available memory, GPUs, disk space or any other resource type including abstract resources like job size or user quotas.
Resources are specified using name value pairs in the format
NAME=VALUE. Multiple resources are specified by separating name value pairs with whitespace.
To specify resources that are available to a scheduler, use the scheduler’s
resources flag when starting the operation. For example, to make 100 GB of memory available to scheduled runs, you can start a scheduler like this:
guild run dask:scheduler resources=MEM=100e9
MEM in this case is arbitrary. You can use any name provided that runs use the same name when defining their required resources. See Required Resources below for more information.
To specify multiple resources, separate each name value pair with a space. In this case you must quote the command line argument. For example, to include a
CPU resource, run:
guild run dask:scheduler resources='MEM=100e9 CPU=32'
To constrain a run within available resources, use a tag that starts with
dask: and contains a valid resource spec. For example, to stage a run that requires 50 GB, run:
guild run <op> --stage --tag dask:MEM=50e9
To specify more than one required resource, you can use multiple tags or use space-separated name value pairs. The following two commands are equivalent:
guild run <op> --stage --tag dask:MEM=50e9 --tag dask:CPU=8
guild run <op> --stage --tag dask:'MEM=50e9 CPU=8'
Scheduling Runs on Specific GPUs
You can use scheduler resources to constrain runs to specific GPU devices. There are three parts to this scheme:
When starting a scheduler, define a resource for each GPU device. To ensure that only one run is started per GPU device, use the resource value 1. Use larger quantities as needed to support multiple runs per GPU device.
When staging a run, specify the target GPU devices using the applicable resource names.
To limit the visible GPU devices to a run, stage the run using the
--gpusoption with applicable device IDs.
To illustrate, consider a system with four GPUs with device IDs of
3.To make all four GPUs available as scheduler resources that can run at most one operation at a time, start a scheduler like this:
guild run dask:scheduler resources='GPU0=1 GPU1=1 GPU2=1 GPU3=1'
In this case, each named resource represents the corresponding GPU ID.
To stage an operation that uses
GPU0—i.e. GPU device with ID
guild run <op> --stage --tag dask:GPU0=1 --gpus 0
This command stages a run with a tag that indicates the run requires the
GPU0 resource. If the required quantity—in this case 1—is available, the scheduler starts the run. Otherwise the scheduler waits until the required quantity is available. When the operation starts, its
CUDA_VISIBLE_DEVICES environment variable is set to
0, according to the use of
--gpu option. This ensures that the operation only sees the target GPU device.
Note The use of
--gpusin this example is but one way to control GPU placement for a run. You can use other methods of assigning work to specific GPU devices such as an operation flag and framework-specific code (e.g. TensorFlow’s device contact manager) or by setting
By default Guild’s Dask based scheduler starts the Dask Dashboard for the underlying Dask cluster. This lets you view run Dask tasks, system status, and Dask logs.
To view the dashboard, you must install the
bokeh Python package. See Requirements above for more information.
Guild shows the dashboard link when starting the scheduler. By default, the dashboard runs on port
8787 on the local system. You can specify a different port or binding address using the scheduler’s
For example, to run the dashboard on port 8888, run:
guild run dask:scheduler dashboard-address=8888
To include an IP address or hostname, you can include it in the address. For example, to run the dashboard on port 8888 of the loopback address, run:
guild run dask:scheduler dashboard-address=127.0.0.1:8888