prefect-gcp makes it easy to leverage the capabilities of Google Cloud Platform (GCP) in your flows, featuring support for Vertex AI, Cloud Run, BigQuery, Cloud Storage, and Secret Manager.
Create a short script, replacing the placeholders with your information.
fromprefect_gcpimportGcpCredentials# replace this PLACEHOLDER dict with your own service account infoservice_account_info={"type":"service_account","project_id":"PROJECT_ID","private_key_id":"KEY_ID","private_key":"-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n","client_email":"SERVICE_ACCOUNT_EMAIL","client_id":"CLIENT_ID","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_x509_cert_url":"https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"}GcpCredentials(service_account_info=service_account_info).save("BLOCK-NAME-PLACEHOLDER")
service_account_info vs service_account_file
The advantage of using service_account_info, instead of service_account_file, is that it is accessible across containers.
If service_account_file is used, the provided file path must be available in the container executing the flow.
Congrats! You can now easily load the saved block, which holds your credentials:
Is your local computer or server running out of memory or taking too long to complete a job?
prefect_gcp can offers a solution by enabling you to execute your Prefect flows remotely, on-demand thru Google Cloud Run.
The following code snippets demonstrate how prefect_gcp can be used to run a job on Cloud Run, either as part of a Prefect deployment's infrastructure or within a flow.
First, find an existing image within the Google Artifact Registry. Ensure it has Python and prefect-gcp[cloud_storage] installed, or follow the instructions below to set one up.
Save a custom infrastructure and storage block by executing the following snippet.
importosfromprefect_gcpimportGcpCredentials,CloudRunJob,GcsBucketgcp_credentials=GcpCredentials.load(os.environ["CREDENTIALS_BLOCK_NAME"])# must be from GCR and have Python + Prefectimage=f"us-docker.pkg.dev/{os.environ['GCP_PROJECT_ID']}/test-example-repository/prefect-gcp:2-python3.11"# noqacloud_run_job=CloudRunJob(image=image,credentials=gcp_credentials,region=os.environ["CLOUD_RUN_JOB_REGION"],)cloud_run_job.save(os.environ["CLOUD_RUN_JOB_BLOCK_NAME"],overwrite=True)bucket_name="cloud-run-job-bucket"cloud_storage_client=gcp_credentials.get_cloud_storage_client()cloud_storage_client.create_bucket(bucket_name)gcs_bucket=GcsBucket(bucket=bucket_name,gcp_credentials=gcp_credentials,)gcs_bucket.save(os.environ["GCS_BUCKET_BLOCK_NAME"],overwrite=True)
Once the flow run has completed, you will see Hello, Prefect! logged in the Prefect UI.
No class found for dispatch key
If you encounter an error message like KeyError: "No class found for dispatch key 'cloud-run-job' in registry for type 'Block'.",
ensure prefect-gcp is installed in the environment that your agent is running!
prefect_gcp can enable you to execute your Prefect flows remotely, on-demand using Google Vertex AI too!
Be sure to additionally install the AI Platform extra!
Setting up a Vertex AI job is extremely similar to setting up a Cloud Run Job, but replace CloudRunJob with the following snippet.
fromprefect_gcpimportGcpCredentials,VertexAICustomTrainingJob,GcsBucketgcp_credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")vertex_ai_job=VertexAICustomTrainingJob(image="IMAGE-NAME-PLACEHOLDER",# must be from GCR and have Python + Prefectcredentials=gcp_credentials,region="us-central1",)vertex_ai_job.save("test-example")
Cloud Run Job vs Vertex AI
With Vertex AI, you can allocate computational resources on-the-fly for your executions, much like Cloud Run.
However, unlike Cloud Run, you have the flexibility to provision instances with higher CPU, GPU, TPU, and RAM capacities.
Additionally, jobs can run for up to 7 days, which is significantly longer than the maximum duration allowed on Cloud Run.
Got big data in BigQuery? prefect_gcp allows you to steadily stream data from and write to Google BigQuery within your Prefect flows!
Be sure to installprefect-gcp with the BigQuery extra!
The provided code snippet shows how you can use prefect_gcp to create a new dataset in BigQuery, define a table, insert rows, and fetch data from the table.
fromprefectimportflowfromprefect_gcp.bigqueryimportGcpCredentials,BigQueryWarehouse@flowdefbigquery_flow():all_rows=[]gcp_credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")client=gcp_credentials.get_bigquery_client()client.create_dataset("test_example",exists_ok=True)withBigQueryWarehouse(gcp_credentials=gcp_credentials)aswarehouse:warehouse.execute("CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);")warehouse.execute_many("INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",seq_of_parameters=[{"name":"Marvin","address":"Highway 42"},{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Highway 42"},],)whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=warehouse.fetch_many("SELECT * FROM test_example.customers",size=2)iflen(new_rows)==0:breakall_rows.extend(new_rows)returnall_rowsbigquery_flow()
With prefect_gcp, you can have peace of mind that your Prefect flows have not only seamlessly uploaded and downloaded objects to Google Cloud Storage, but also have these actions logged.
Be sure to additionally installprefect-gcp with the Cloud Storage extra!
The provided code snippet shows how you can use prefect_gcp to upload a file to a Google Cloud Storage bucket and download the same file under a different file name.
frompathlibimportPathfromprefectimportflowfromprefect_gcpimportGcpCredentials,GcsBucket@flowdefcloud_storage_flow():# create a dummy file to uploadfile_path=Path("test-example.txt")file_path.write_text("Hello, Prefect!")gcp_credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")gcs_bucket=GcsBucket(bucket="BUCKET-NAME-PLACEHOLDER",gcp_credentials=gcp_credentials)gcs_bucket_path=gcs_bucket.upload_from_path(file_path)downloaded_file_path=gcs_bucket.download_object_to_path(gcs_bucket_path,"downloaded-test-example.txt")returndownloaded_file_path.read_text()cloud_storage_flow()
Upload and download directories
GcsBucket supports uploading and downloading entire directories. To view examples, check out the Examples Catalog!
Do you already have secrets available on Google Secret Manager? There's no need to migrate them!
prefect_gcp allows you to read and write secrets with Google Secret Manager within your Prefect flows.
Be sure to installprefect-gcp with the Secret Manager extra!
The provided code snippet shows how you can use prefect_gcp to write a secret to the Secret Manager, read the secret data, delete the secret, and finally return the secret data.
Accessing Google credentials or clients from GcpCredentials¶
In the case that prefect-gcp is missing a feature, feel free to submit an issue.
In the meantime, you may want to access the underlying Google Cloud credentials or clients, which prefect-gcp exposes via the GcpCredentials block.
The provided code snippet shows how you can use prefect_gcp to instantiate a Google Cloud client, like bigquery.Client.
Note a GcpCredentials object is NOT a valid input to the underlying BigQuery client--use the get_credentials_from_service_account method to access and pass an actual google.auth.Credentials object.