Upgrading from agents to workers significantly enhances the experience of deploying flows. It simplifies the specification of each flow's infrastructure and runtime environment.
A worker is the fusion of an agent with an infrastructure block. Like agents, workers poll a work pool for flow runs that are scheduled to start. Like infrastructure blocks, workers are typed - they work with only one kind of infrastructure, and they specify the default configuration for jobs submitted to that infrastructure.
Accordingly, workers are not a drop-in replacement for agents. Using workers requires deploying flows differently. In particular, deploying a flow with a worker does not involve specifying an infrastructure block. Instead, infrastructure configuration is specified on the work pool and passed to each worker that polls work from that pool.
This guide provides an overview of the differences between agents and workers. It also describes how to upgrade from agents to workers in just a few quick steps.
Work pools allow greater customization and governance of infrastructure parameters for deployments via their base job template.
Prefect Cloud push work pools enable flow execution in your cloud provider environment without needing to host a worker.
Prefect Cloud managed work pools allow you to run flows on Prefect's infrastructure, without needing to host a worker or configure cloud provider infrastructure.
The Python deployment experience with .deploy() or the alternative deployment experience with prefect.yaml are more flexible and easier to use than block and agent-based deployments.
When using the YAML-based deployment API, you can configure a pull action in your prefect.yaml file to specify how to retrieve flow code for your deployments. You can use configuration from your existing storage blocks to define your pull action via templating.
When using the Python deployment API, you can pass any storage block to the flow.deploy method to specify how to retrieve flow code for your deployment.
This new work pool will replace your infrastructure block.
You can use the .publish_as_work_pool method on any infrastructure block to create a work pool with the same configuration.
For example, if you have a KubernetesJob infrastructure block named 'my-k8s-job', you can create a work pool with the same configuration with this script:
Running this script will create a work pool named 'my-k8s-job' with the same configuration as your infrastructure block.
Serving flows
If you are using a Process infrastructure block and a LocalFilesystem storage block (or aren't using an infrastructure and storage block at all), you can use flow.serve to create a deployment without needing to specify a work pool name or start a worker.
This is a quick way to create a deployment for a flow and is a great way to manage your deployments if you don't need the dynamic infrastructure creation or configuration offered by workers.
Check out our Docker guide for how to build a served flow into a Docker image and host it in your environment.
This worker will replace your agent and poll your new work pool for flow runs to execute.
prefectworkerstart-p<workpoolname>
Deploy your flows to the new work pool
To deploy your flows to the new work pool, you can use flow.deploy for a Pythonic deployment experience or prefect deploy for a YAML-based deployment experience.
If you currently use Deployment.build_from_flow, we recommend using flow.deploy.
If you currently use prefect deployment build and prefect deployment apply, we recommend using prefect deploy.
flow.from_source will load your flow from a remote storage location and make it deployable. Your existing storage block can be passed to the source argument of flow.from_source.
Below are some examples of how to translate Deployment.build_from_flow to flow.deploy.
fromprefectimportflow@flow(log_prints=True)defmy_flow(name:str="world"):print(f"Hello {name}! I'm a flow from a Python script!")if__name__=="__main__":Deployment.build_from_flow(my_flow,name="my-deployment",parameters=dict(name="Marvin"),)
You can replace Deployment.build_from_flow with flow.serve :
fromprefectimportflow@flow(log_prints=True)defmy_flow(name:str="world"):print(f"Hello {name}! I'm a flow from a Python script!")if__name__=="__main__":my_flow.serve(name="my-deployment",parameters=dict(name="Marvin"),)
This will start a process that will serve your flow and execute any flow runs that are scheduled to start.
If you currently use a storage block to load your flow code but no infrastructure block:
fromprefectimportflowfromprefect.storageimportGitHub@flow(log_prints=True)defmy_flow(name:str="world"):print(f"Hello {name}! I'm a flow from a GitHub repo!")if__name__=="__main__":Deployment.build_from_flow(my_flow,name="my-deployment",storage=GitHub.load("demo-repo"),parameters=dict(name="Marvin"),)
you can use flow.from_source to load your flow from the same location and flow.serve to create a deployment:
This will allow you to execute scheduled flow runs without starting a worker. Additionally, the process serving your flow will regularly check for updates to your flow code and automatically update the flow if it detects any changes to the code.
Deploying using an infrastructure and storage block¶
For the code below, we'll need to create a work pool from our infrastructure block and pass it to flow.deploy as the work_pool_name argument. We'll also need to pass our storage block to flow.from_source as the source argument.
fromprefectimportflowfromprefect.deploymentsimportDeploymentfromprefect.filesystemsimportGitHubfromprefect.infrastructure.kubernetesimportKubernetesJob@flow(log_prints=True)defmy_flow(name:str="world"):print(f"Hello {name}! I'm a flow from a GitHub repo!")if__name__=="__main__":Deployment.build_from_flow(my_flow,name="my-deployment",storage=GitHub.load("demo-repo"),entrypoint="example.py:my_flow",infrastructure=KubernetesJob.load("my-k8s-job"),infra_overrides=dict(pull_policy="Never"),parameters=dict(name="Marvin"),)
The equivalent deployment code using flow.deploy would look like this:
If you currently bake your flow code into a Docker image before deploying, you can use the image argument of flow.deploy to build a Docker image as part of your deployment process:
fromprefectimportflow@flow(log_prints=True)defmy_flow(name:str="world"):print(f"Hello {name}! I'm a flow from a Docker image!")if__name__=="__main__":my_flow.deploy(name="my-deployment",image="my-repo/my-image:latest",work_pool_name="my-k8s-job",job_variables=dict(pull_policy="Never"),parameters=dict(name="Marvin"),)
You can skip a flow.from_source call when building an image with flow.deploy. Prefect will keep track of the flow's source code location in the image and load it from that location when the flow is executed.
Always run prefect deploy commands from the root level of your repo!
With agents, you might have had multiple deployment.yaml files, but under worker deployment patterns, each repo will have a single prefect.yaml file located at the root of the repo that contains deployment configuration for all flows in that repo.
To set up a new prefect.yaml file for your deployments, run the following command from the root level of your repo:
prefectdeploy
This will start a wizard that will guide you through setting up your deployment.
For step 4, select y on the last prompt to save the configuration for the deployment.
Saving the configuration for your deployment will result in a prefect.yaml file populated with your first deployment. You can use this YAML file to edit and define multiple deployments for this repo.
You can add more deployments to the deployments list in your prefect.yaml file and/or by continuing to use the deployment creation wizard.