This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 6d6588f Add Google Cloud Workflows Operators (#13366) 6d6588f is described below commit 6d6588fe2b8bb5fa33e930646d963df3e0530f23 Author: Tomek Urbaszek <turbas...@gmail.com> AuthorDate: Thu Jan 28 20:35:09 2021 +0100 Add Google Cloud Workflows Operators (#13366) Add Google Cloud Workflows Operators, system test, example and sensor Co-authored-by: Tobiasz Kędzierski <tobiasz.kedzier...@polidea.com> --- .../google/cloud/example_dags/example_workflows.py | 197 ++++++ airflow/providers/google/cloud/hooks/workflows.py | 401 ++++++++++++ .../providers/google/cloud/operators/workflows.py | 714 +++++++++++++++++++++ .../providers/google/cloud/sensors/workflows.py | 123 ++++ airflow/providers/google/provider.yaml | 14 + .../operators/cloud/workflows.rst | 185 ++++++ setup.py | 2 + .../providers/google/cloud/hooks/test_workflows.py | 256 ++++++++ .../google/cloud/operators/test_workflows.py | 383 +++++++++++ .../cloud/operators/test_workflows_system.py | 29 + .../google/cloud/sensors/test_workflows.py | 108 ++++ .../google/cloud/utils/gcp_authenticator.py | 1 + 12 files changed, 2413 insertions(+) diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/airflow/providers/google/cloud/example_dags/example_workflows.py new file mode 100644 index 0000000..0fab435 --- /dev/null +++ b/airflow/providers/google/cloud/example_dags/example_workflows.py @@ -0,0 +1,197 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +from airflow import DAG +from airflow.providers.google.cloud.operators.workflows import ( + WorkflowsCancelExecutionOperator, + WorkflowsCreateExecutionOperator, + WorkflowsCreateWorkflowOperator, + WorkflowsDeleteWorkflowOperator, + WorkflowsGetExecutionOperator, + WorkflowsGetWorkflowOperator, + WorkflowsListExecutionsOperator, + WorkflowsListWorkflowsOperator, + WorkflowsUpdateWorkflowOperator, +) +from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor +from airflow.utils.dates import days_ago + +LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1") +PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id") + +WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow") + +# [START how_to_define_workflow] +WORKFLOW_CONTENT = """ +- getCurrentTime: + call: http.get + args: + url: https://us-central1-workflowsample.cloudfunctions.net/datetime + result: currentTime +- readWikipedia: + call: http.get + args: + url: https://en.wikipedia.org/w/api.php + query: + action: opensearch + search: ${currentTime.body.dayOfTheWeek} + result: wikiResult +- returnResult: + return: ${wikiResult.body[1]} +""" + +WORKFLOW = { + "description": "Test workflow", + "labels": {"airflow-version": "dev"}, + "source_contents": WORKFLOW_CONTENT, +} +# [END how_to_define_workflow] + +EXECUTION = {"argument": ""} + +SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow") +SLEEP_WORKFLOW_CONTENT = """ +- someSleep: + call: sys.sleep + args: + seconds: 120 +""" + +SLEEP_WORKFLOW = { + "description": "Test workflow", + "labels": {"airflow-version": "dev"}, + "source_contents": SLEEP_WORKFLOW_CONTENT, +} + + +with DAG("example_cloud_workflows", start_date=days_ago(1), schedule_interval=None) as dag: + # [START how_to_create_workflow] + create_workflow = WorkflowsCreateWorkflowOperator( + task_id="create_workflow", + location=LOCATION, + project_id=PROJECT_ID, + workflow=WORKFLOW, + workflow_id=WORKFLOW_ID, + ) + # [END how_to_create_workflow] + + # [START how_to_update_workflow] + update_workflows = WorkflowsUpdateWorkflowOperator( + task_id="update_workflows", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=WORKFLOW_ID, + update_mask={"paths": ["name", "description"]}, + ) + # [END how_to_update_workflow] + + # [START how_to_get_workflow] + get_workflow = WorkflowsGetWorkflowOperator( + task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID + ) + # [END how_to_get_workflow] + + # [START how_to_list_workflows] + list_workflows = WorkflowsListWorkflowsOperator( + task_id="list_workflows", + location=LOCATION, + project_id=PROJECT_ID, + ) + # [END how_to_list_workflows] + + # [START how_to_delete_workflow] + delete_workflow = WorkflowsDeleteWorkflowOperator( + task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID + ) + # [END how_to_delete_workflow] + + # [START how_to_create_execution] + create_execution = WorkflowsCreateExecutionOperator( + task_id="create_execution", + location=LOCATION, + project_id=PROJECT_ID, + execution=EXECUTION, + workflow_id=WORKFLOW_ID, + ) + # [END how_to_create_execution] + + # [START how_to_wait_for_execution] + wait_for_execution = WorkflowExecutionSensor( + task_id="wait_for_execution", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=WORKFLOW_ID, + execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}', + ) + # [END how_to_wait_for_execution] + + # [START how_to_get_execution] + get_execution = WorkflowsGetExecutionOperator( + task_id="get_execution", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=WORKFLOW_ID, + execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}', + ) + # [END how_to_get_execution] + + # [START how_to_list_executions] + list_executions = WorkflowsListExecutionsOperator( + task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID + ) + # [END how_to_list_executions] + + create_workflow_for_cancel = WorkflowsCreateWorkflowOperator( + task_id="create_workflow_for_cancel", + location=LOCATION, + project_id=PROJECT_ID, + workflow=SLEEP_WORKFLOW, + workflow_id=SLEEP_WORKFLOW_ID, + ) + + create_execution_for_cancel = WorkflowsCreateExecutionOperator( + task_id="create_execution_for_cancel", + location=LOCATION, + project_id=PROJECT_ID, + execution=EXECUTION, + workflow_id=SLEEP_WORKFLOW_ID, + ) + + # [START how_to_cancel_execution] + cancel_execution = WorkflowsCancelExecutionOperator( + task_id="cancel_execution", + location=LOCATION, + project_id=PROJECT_ID, + workflow_id=SLEEP_WORKFLOW_ID, + execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}', + ) + # [END how_to_cancel_execution] + + create_workflow >> update_workflows >> [get_workflow, list_workflows] + update_workflows >> [create_execution, create_execution_for_cancel] + + create_execution >> wait_for_execution >> [get_execution, list_executions] + create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution + + [cancel_execution, list_executions] >> delete_workflow + + +if __name__ == '__main__': + dag.clear(dag_run_state=None) + dag.run() diff --git a/airflow/providers/google/cloud/hooks/workflows.py b/airflow/providers/google/cloud/hooks/workflows.py new file mode 100644 index 0000000..6c78350 --- /dev/null +++ b/airflow/providers/google/cloud/hooks/workflows.py @@ -0,0 +1,401 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Dict, Optional, Sequence, Tuple, Union + +from google.api_core.operation import Operation +from google.api_core.retry import Retry + +# pylint: disable=no-name-in-module +from google.cloud.workflows.executions_v1beta import Execution, ExecutionsClient +from google.cloud.workflows.executions_v1beta.services.executions.pagers import ListExecutionsPager +from google.cloud.workflows_v1beta import Workflow, WorkflowsClient +from google.cloud.workflows_v1beta.services.workflows.pagers import ListWorkflowsPager +from google.protobuf.field_mask_pb2 import FieldMask + +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook + +# pylint: enable=no-name-in-module + + +class WorkflowsHook(GoogleBaseHook): + """ + Hook for Google GCP APIs. + + All the methods in the hook where project_id is used must be called with + keyword arguments rather than positional. + """ + + def get_workflows_client(self) -> WorkflowsClient: + """Returns WorkflowsClient.""" + return WorkflowsClient(credentials=self._get_credentials(), client_info=self.client_info) + + def get_executions_client(self) -> ExecutionsClient: + """Returns ExecutionsClient.""" + return ExecutionsClient(credentials=self._get_credentials(), client_info=self.client_info) + + @GoogleBaseHook.fallback_to_default_project_id + def create_workflow( + self, + workflow: Dict, + workflow_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Operation: + """ + Creates a new workflow. If a workflow with the specified name + already exists in the specified project and location, the long + running operation will return + [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error. + + :param workflow: Required. Workflow to be created. + :type workflow: Dict + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_workflows_client() + parent = f"projects/{project_id}/locations/{location}" + return client.create_workflow( + request={"parent": parent, "workflow": workflow, "workflow_id": workflow_id}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_workflow( + self, + workflow_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Workflow: + """ + Gets details of a single Workflow. + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_workflows_client() + name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}" + return client.get_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata) + + def update_workflow( + self, + workflow: Union[Dict, Workflow], + update_mask: Optional[FieldMask] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Operation: + """ + Updates an existing workflow. + Running this method has no impact on already running + executions of the workflow. A new revision of the + workflow may be created as a result of a successful + update operation. In that case, such revision will be + used in new workflow executions. + + :param workflow: Required. Workflow to be created. + :type workflow: Dict + :param update_mask: List of fields to be updated. If not present, + the entire workflow will be updated. + :type update_mask: FieldMask + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_workflows_client() + return client.update_workflow( + request={"workflow": workflow, "update_mask": update_mask}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_workflow( + self, + workflow_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Operation: + """ + Deletes a workflow with the specified name. + This method also cancels and deletes all running + executions of the workflow. + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_workflows_client() + name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}" + return client.delete_workflow(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata) + + @GoogleBaseHook.fallback_to_default_project_id + def list_workflows( + self, + location: str, + project_id: str, + filter_: Optional[str] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> ListWorkflowsPager: + """ + Lists Workflows in a given project and location. + The default order is not specified. + + :param filter_: Filter to restrict results to specific workflows. + :type filter_: str + :param order_by: Comma-separated list of fields that that + specify the order of the results. Default sorting order for a field is ascending. + To specify descending order for a field, append a "desc" suffix. + If not specified, the results will be returned in an unspecified order. + :type order_by: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_workflows_client() + parent = f"projects/{project_id}/locations/{location}" + + return client.list_workflows( + request={"parent": parent, "filter": filter_, "order_by": order_by}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_execution( + self, + workflow_id: str, + location: str, + project_id: str, + execution: Dict, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Execution: + """ + Creates a new execution using the latest revision of + the given workflow. + + :param execution: Required. Input parameters of the execution represented as a dictionary. + :type execution: Dict + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_executions_client() + parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}" + return client.create_execution( + request={"parent": parent, "execution": execution}, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_execution( + self, + workflow_id: str, + execution_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Execution: + """ + Returns an execution for the given ``workflow_id`` and ``execution_id``. + + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param execution_id: Required. The ID of the execution. + :type execution_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_executions_client() + name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}" + return client.get_execution(request={"name": name}, retry=retry, timeout=timeout, metadata=metadata) + + @GoogleBaseHook.fallback_to_default_project_id + def cancel_execution( + self, + workflow_id: str, + execution_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> Execution: + """ + Cancels an execution using the given ``workflow_id`` and ``execution_id``. + + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param execution_id: Required. The ID of the execution. + :type execution_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_executions_client() + name = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}/executions/{execution_id}" + return client.cancel_execution( + request={"name": name}, retry=retry, timeout=timeout, metadata=metadata + ) + + @GoogleBaseHook.fallback_to_default_project_id + def list_executions( + self, + workflow_id: str, + location: str, + project_id: str, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + ) -> ListExecutionsPager: + """ + Returns a list of executions which belong to the + workflow with the given name. The method returns + executions of all workflow revisions. Returned + executions are ordered by their start time (newest + first). + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + metadata = metadata or () + client = self.get_executions_client() + parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_id}" + return client.list_executions( + request={"parent": parent}, retry=retry, timeout=timeout, metadata=metadata + ) diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py new file mode 100644 index 0000000..c7fc96d --- /dev/null +++ b/airflow/providers/google/cloud/operators/workflows.py @@ -0,0 +1,714 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import hashlib +import json +import re +import uuid +from datetime import datetime, timedelta +from typing import Dict, Optional, Sequence, Tuple, Union + +import pytz +from google.api_core.exceptions import AlreadyExists +from google.api_core.retry import Retry + +# pylint: disable=no-name-in-module +from google.cloud.workflows.executions_v1beta import Execution +from google.cloud.workflows_v1beta import Workflow + +# pylint: enable=no-name-in-module +from google.protobuf.field_mask_pb2 import FieldMask + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook + + +class WorkflowsCreateWorkflowOperator(BaseOperator): + """ + Creates a new workflow. If a workflow with the specified name + already exists in the specified project and location, the long + running operation will return + [ALREADY_EXISTS][google.rpc.Code.ALREADY_EXISTS] error. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsCreateWorkflowOperator` + + :param workflow: Required. Workflow to be created. + :type workflow: Dict + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow", "workflow_id") + template_fields_renderers = {"workflow": "json"} + + def __init__( + self, + *, + workflow: Dict, + workflow_id: str, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + force_rerun: bool = False, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow = workflow + self.workflow_id = workflow_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.force_rerun = force_rerun + + def _workflow_id(self, context): + if self.workflow_id and not self.force_rerun: + # If users provide workflow id then assuring the idempotency + # is on their side + return self.workflow_id + + if self.force_rerun: + hash_base = str(uuid.uuid4()) + else: + hash_base = json.dumps(self.workflow, sort_keys=True) + + # We are limited by allowed length of workflow_id so + # we use hash of whole information + exec_date = context['execution_date'].isoformat() + base = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{hash_base}" + workflow_id = hashlib.md5(base.encode()).hexdigest() + return re.sub(r"[:\-+.]", "_", workflow_id) + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + workflow_id = self._workflow_id(context) + + self.log.info("Creating workflow") + try: + operation = hook.create_workflow( + workflow=self.workflow, + workflow_id=workflow_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + workflow = operation.result() + except AlreadyExists: + workflow = hook.get_workflow( + workflow_id=workflow_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return Workflow.to_dict(workflow) + + +class WorkflowsUpdateWorkflowOperator(BaseOperator): + """ + Updates an existing workflow. + Running this method has no impact on already running + executions of the workflow. A new revision of the + workflow may be created as a result of a successful + update operation. In that case, such revision will be + used in new workflow executions. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsUpdateWorkflowOperator` + + :param workflow_id: Required. The ID of the workflow to be updated. + :type workflow_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param update_mask: List of fields to be updated. If not present, + the entire workflow will be updated. + :type update_mask: FieldMask + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("workflow_id", "update_mask") + template_fields_renderers = {"update_mask": "json"} + + def __init__( + self, + *, + workflow_id: str, + location: str, + project_id: Optional[str] = None, + update_mask: Optional[FieldMask] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.location = location + self.project_id = project_id + self.update_mask = update_mask + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + + workflow = hook.get_workflow( + workflow_id=self.workflow_id, + project_id=self.project_id, + location=self.location, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("Updating workflow") + operation = hook.update_workflow( + workflow=workflow, + update_mask=self.update_mask, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + workflow = operation.result() + return Workflow.to_dict(workflow) + + +class WorkflowsDeleteWorkflowOperator(BaseOperator): + """ + Deletes a workflow with the specified name. + This method also cancels and deletes all running + executions of the workflow. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsDeleteWorkflowOperator` + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id") + + def __init__( + self, + *, + workflow_id: str, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Deleting workflow %s", self.workflow_id) + operation = hook.delete_workflow( + workflow_id=self.workflow_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + operation.result() + + +class WorkflowsListWorkflowsOperator(BaseOperator): + """ + Lists Workflows in a given project and location. + The default order is not specified. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsListWorkflowsOperator` + + :param filter_: Filter to restrict results to specific workflows. + :type filter_: str + :param order_by: Comma-separated list of fields that that + specify the order of the results. Default sorting order for a field is ascending. + To specify descending order for a field, append a "desc" suffix. + If not specified, the results will be returned in an unspecified order. + :type order_by: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "order_by", "filter_") + + def __init__( + self, + *, + location: str, + project_id: Optional[str] = None, + filter_: Optional[str] = None, + order_by: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.filter_ = filter_ + self.order_by = order_by + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Retrieving workflows") + workflows_iter = hook.list_workflows( + filter_=self.filter_, + order_by=self.order_by, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return [Workflow.to_dict(w) for w in workflows_iter] + + +class WorkflowsGetWorkflowOperator(BaseOperator): + """ + Gets details of a single Workflow. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsGetWorkflowOperator` + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id") + + def __init__( + self, + *, + workflow_id: str, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Retrieving workflow") + workflow = hook.get_workflow( + workflow_id=self.workflow_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return Workflow.to_dict(workflow) + + +class WorkflowsCreateExecutionOperator(BaseOperator): + """ + Creates a new execution using the latest revision of + the given workflow. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsCreateExecutionOperator` + + :param execution: Required. Execution to be created. + :type execution: Dict + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id", "execution") + template_fields_renderers = {"execution": "json"} + + def __init__( + self, + *, + workflow_id: str, + execution: Dict, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.execution = execution + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Creating execution") + execution = hook.create_execution( + workflow_id=self.workflow_id, + execution=self.execution, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + execution_id = execution.name.split("/")[-1] + self.xcom_push(context, key="execution_id", value=execution_id) + return Execution.to_dict(execution) + + +class WorkflowsCancelExecutionOperator(BaseOperator): + """ + Cancels an execution using the given ``workflow_id`` and ``execution_id``. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsCancelExecutionOperator` + + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param execution_id: Required. The ID of the execution. + :type execution_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id", "execution_id") + + def __init__( + self, + *, + workflow_id: str, + execution_id: str, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.execution_id = execution_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Canceling execution %s", self.execution_id) + execution = hook.cancel_execution( + workflow_id=self.workflow_id, + execution_id=self.execution_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return Execution.to_dict(execution) + + +class WorkflowsListExecutionsOperator(BaseOperator): + """ + Returns a list of executions which belong to the + workflow with the given name. The method returns + executions of all workflow revisions. Returned + executions are ordered by their start time (newest + first). + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsListExecutionsOperator` + + :param workflow_id: Required. The ID of the workflow to be created. + :type workflow_id: str + :param start_date_filter: If passed only executions older that this date will be returned. + By default operators return executions from last 60 minutes + :type start_date_filter: datetime + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id") + + def __init__( + self, + *, + workflow_id: str, + location: str, + start_date_filter: Optional[datetime] = None, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.location = location + self.start_date_filter = start_date_filter or datetime.now(tz=pytz.UTC) - timedelta(minutes=60) + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Retrieving executions for workflow %s", self.workflow_id) + execution_iter = hook.list_executions( + workflow_id=self.workflow_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter] + + +class WorkflowsGetExecutionOperator(BaseOperator): + """ + Returns an execution for the given ``workflow_id`` and ``execution_id``. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:WorkflowsGetExecutionOperator` + + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param execution_id: Required. The ID of the execution. + :type execution_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The GCP region in which to handle the request. + :type location: str + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id", "execution_id") + + def __init__( + self, + *, + workflow_id: str, + execution_id: str, + location: str, + project_id: Optional[str] = None, + retry: Optional[Retry] = None, + timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.workflow_id = workflow_id + self.execution_id = execution_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Retrieving execution %s for workflow %s", self.execution_id, self.workflow_id) + execution = hook.get_execution( + workflow_id=self.workflow_id, + execution_id=self.execution_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + return Execution.to_dict(execution) diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py new file mode 100644 index 0000000..5950458 --- /dev/null +++ b/airflow/providers/google/cloud/sensors/workflows.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional, Sequence, Set, Tuple, Union + +from google.api_core.retry import Retry +from google.cloud.workflows.executions_v1beta import Execution + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook +from airflow.sensors.base import BaseSensorOperator + + +class WorkflowExecutionSensor(BaseSensorOperator): + """ + Checks state of an execution for the given ``workflow_id`` and ``execution_id``. + + :param workflow_id: Required. The ID of the workflow. + :type workflow_id: str + :param execution_id: Required. The ID of the execution. + :type execution_id: str + :param project_id: Required. The ID of the Google Cloud project the cluster belongs to. + :type project_id: str + :param location: Required. The Cloud Dataproc region in which to handle the request. + :type location: str + :param success_states: Execution states to be considered as successful, by default + it's only ``SUCCEEDED`` state + :type success_states: List[Execution.State] + :param failure_states: Execution states to be considered as failures, by default + they are ``FAILED`` and ``CANCELLED`` states. + :type failure_states: List[Execution.State] + :param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be + retried. + :type retry: google.api_core.retry.Retry + :param request_timeout: The amount of time, in seconds, to wait for the request to complete. Note that if + ``retry`` is specified, the timeout applies to each individual attempt. + :type request_timeout: float + :param metadata: Additional metadata that is provided to the method. + :type metadata: Sequence[Tuple[str, str]] + """ + + template_fields = ("location", "workflow_id", "execution_id") + + def __init__( + self, + *, + workflow_id: str, + execution_id: str, + location: str, + project_id: str, + success_states: Optional[Set[Execution.State]] = None, + failure_states: Optional[Set[Execution.State]] = None, + retry: Optional[Retry] = None, + request_timeout: Optional[float] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.success_states = success_states or {Execution.State.SUCCEEDED} + self.failure_states = failure_states or {Execution.State.FAILED, Execution.State.CANCELLED} + self.workflow_id = workflow_id + self.execution_id = execution_id + self.location = location + self.project_id = project_id + self.retry = retry + self.request_timeout = request_timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def poke(self, context): + hook = WorkflowsHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + self.log.info("Checking state of execution %s for workflow %s", self.execution_id, self.workflow_id) + execution: Execution = hook.get_execution( + workflow_id=self.workflow_id, + execution_id=self.execution_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.request_timeout, + metadata=self.metadata, + ) + + state = execution.state + if state in self.failure_states: + raise AirflowException( + f"Execution {self.execution_id} for workflow {self.execution_id} " + f"failed and is in `{state}` state", + ) + + if state in self.success_states: + self.log.info( + "Execution %s for workflow %s completed with state: %s", + self.execution_id, + self.workflow_id, + state, + ) + return True + + self.log.info( + "Execution %s for workflow %s does not completed yet, current state: %s", + self.execution_id, + self.workflow_id, + state, + ) + return False diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 67b7af4..39ba434 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -301,6 +301,11 @@ integrations: - /docs/apache-airflow-providers-google/operators/cloud/natural_language.rst logo: /integration-logos/gcp/Cloud-NLP.png tags: [gcp] + - integration-name: Google Cloud Workflows + external-doc-url: https://cloud.google.com/workflows/ + how-to-guide: + - /docs/apache-airflow-providers-google/operators/cloud/workflows.rst + tags: [gcp] operators: - integration-name: Google Ads @@ -401,6 +406,9 @@ operators: - integration-name: Google Cloud Vision python-modules: - airflow.providers.google.cloud.operators.vision + - integration-name: Google Cloud Workflows + python-modules: + - airflow.providers.google.cloud.operators.workflows - integration-name: Google Cloud Firestore python-modules: - airflow.providers.google.firebase.operators.firestore @@ -445,6 +453,9 @@ sensors: - integration-name: Google Cloud Pub/Sub python-modules: - airflow.providers.google.cloud.sensors.pubsub + - integration-name: Google Cloud Workflows + python-modules: + - airflow.providers.google.cloud.sensors.workflows - integration-name: Google Campaign Manager python-modules: - airflow.providers.google.marketing_platform.sensors.campaign_manager @@ -565,6 +576,9 @@ hooks: - integration-name: Google Cloud Vision python-modules: - airflow.providers.google.cloud.hooks.vision + - integration-name: Google Cloud Workflows + python-modules: + - airflow.providers.google.cloud.hooks.workflows - integration-name: Google python-modules: - airflow.providers.google.common.hooks.base_google diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst new file mode 100644 index 0000000..551a7ca --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst @@ -0,0 +1,185 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Cloud Workflows Operators +================================ + +You can use Workflows to create serverless workflows that link series of serverless tasks together +in an order you define. Combine the power of Google Cloud's APIs, serverless products like Cloud +Functions and Cloud Run, and calls to external APIs to create flexible serverless applications. + +For more information about the service visit +`Workflows production documentation <Product documentation <https://cloud.google.com/workflows/docs/overview>`__. + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +------------------ + +.. include::/operators/_partials/prerequisite_tasks.rst + + +.. _howto/operator:WorkflowsCreateWorkflowOperator: + +Create workflow +=============== + +To create a workflow use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_create_workflow] + :end-before: [END how_to_create_workflow] + +The workflow should be define in similar why to this example: + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 0 + :start-after: [START how_to_define_workflow] + :end-before: [END how_to_define_workflow] + +For more information about authoring workflows check official +production documentation `<Product documentation <https://cloud.google.com/workflows/docs/overview>`__. + + +.. _howto/operator:WorkflowsUpdateWorkflowOperator: + +Update workflow +=============== + +To update a workflow use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_update_workflow] + :end-before: [END how_to_update_workflow] + +.. _howto/operator:WorkflowsGetWorkflowOperator: + +Get workflow +============ + +To get a workflow use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_get_workflow] + :end-before: [END how_to_get_workflow] + +.. _howto/operator:WorkflowsListWorkflowsOperator: + +List workflows +============== + +To list workflows use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_list_workflows] + :end-before: [END how_to_list_workflows] + +.. _howto/operator:WorkflowsDeleteWorkflowOperator: + +Delete workflow +=============== + +To delete a workflow use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_delete_workflow] + :end-before: [END how_to_delete_workflow] + +.. _howto/operator:WorkflowsCreateExecutionOperator: + +Create execution +================ + +To create an execution use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`. +This operator is not idempotent due to API limitation. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_create_execution] + :end-before: [END how_to_create_execution] + +The create operator does not wait for execution to complete. To wait for execution result use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_wait_for_execution] + :end-before: [END how_to_wait_for_execution] + +.. _howto/operator:WorkflowsGetExecutionOperator: + +Get execution +================ + +To get an execution use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_get_execution] + :end-before: [END how_to_get_execution] + +.. _howto/operator:WorkflowsListExecutionsOperator: + +List executions +=============== + +To list executions use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`. +By default this operator will return only executions for last 60 minutes. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_list_executions] + :end-before: [END how_to_list_executions] + +.. _howto/operator:WorkflowsCancelExecutionOperator: + +Cancel execution +================ + +To cancel an execution use +:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py + :language: python + :dedent: 4 + :start-after: [START how_to_cancel_execution] + :end-before: [END how_to_cancel_execution] diff --git a/setup.py b/setup.py index 6e2a2f7..67e6a85 100644 --- a/setup.py +++ b/setup.py @@ -278,6 +278,7 @@ flask_oauth = [ google = [ 'PyOpenSSL', 'google-ads>=4.0.0,<8.0.0', + 'google-api-core>=1.25.1,<2.0.0', 'google-api-python-client>=1.6.0,<2.0.0', 'google-auth>=1.0.0,<2.0.0', 'google-auth-httplib2>=0.0.1', @@ -305,6 +306,7 @@ google = [ 'google-cloud-translate>=1.5.0,<2.0.0', 'google-cloud-videointelligence>=1.7.0,<2.0.0', 'google-cloud-vision>=0.35.2,<2.0.0', + 'google-cloud-workflows>=0.1.0,<2.0.0', 'grpcio-gcp>=0.2.2', 'json-merge-patch~=0.2', 'pandas-gbq', diff --git a/tests/providers/google/cloud/hooks/test_workflows.py b/tests/providers/google/cloud/hooks/test_workflows.py new file mode 100644 index 0000000..4f3d4d0 --- /dev/null +++ b/tests/providers/google/cloud/hooks/test_workflows.py @@ -0,0 +1,256 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook + +BASE_PATH = "airflow.providers.google.cloud.hooks.workflows.{}" +LOCATION = "europe-west1" +WORKFLOW_ID = "workflow_id" +EXECUTION_ID = "execution_id" +WORKFLOW = {"aa": "bb"} +EXECUTION = {"ccc": "ddd"} +PROJECT_ID = "airflow-testing" +METADATA = () +TIMEOUT = None +RETRY = None +FILTER_ = "aaaa" +ORDER_BY = "bbb" +UPDATE_MASK = "aaa,bbb" + +WORKFLOW_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}" +WORKFLOW_NAME = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}" +EXECUTION_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}" +EXECUTION_NAME = ( + f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}/executions/{EXECUTION_ID}" +) + + +def mock_init(*args, **kwargs): + pass + + +class TestWorkflowsHook: + def setup_method(self, _): + with mock.patch(BASE_PATH.format("GoogleBaseHook.__init__"), new=mock_init): + self.hook = WorkflowsHook(gcp_conn_id="test") # pylint: disable=attribute-defined-outside-init + + @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials")) + @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock) + @mock.patch(BASE_PATH.format("WorkflowsClient")) + def test_get_workflows_client(self, mock_client, mock_client_info, mock_get_credentials): + self.hook.get_workflows_client() + mock_client.assert_called_once_with( + credentials=mock_get_credentials.return_value, + client_info=mock_client_info.return_value, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook._get_credentials")) + @mock.patch(BASE_PATH.format("WorkflowsHook.client_info"), new_callable=mock.PropertyMock) + @mock.patch(BASE_PATH.format("ExecutionsClient")) + def test_get_executions_client(self, mock_client, mock_client_info, mock_get_credentials): + self.hook.get_executions_client() + mock_client.assert_called_once_with( + credentials=mock_get_credentials.return_value, + client_info=mock_client_info.return_value, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client")) + def test_create_workflow(self, mock_client): + result = self.hook.create_workflow( + workflow=WORKFLOW, + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.create_workflow.return_value == result + mock_client.return_value.create_workflow.assert_called_once_with( + request=dict(workflow=WORKFLOW, workflow_id=WORKFLOW_ID, parent=WORKFLOW_PARENT), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client")) + def test_get_workflow(self, mock_client): + result = self.hook.get_workflow( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.get_workflow.return_value == result + mock_client.return_value.get_workflow.assert_called_once_with( + request=dict(name=WORKFLOW_NAME), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client")) + def test_update_workflow(self, mock_client): + result = self.hook.update_workflow( + workflow=WORKFLOW, + update_mask=UPDATE_MASK, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.update_workflow.return_value == result + mock_client.return_value.update_workflow.assert_called_once_with( + request=dict( + workflow=WORKFLOW, + update_mask=UPDATE_MASK, + ), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client")) + def test_delete_workflow(self, mock_client): + result = self.hook.delete_workflow( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.delete_workflow.return_value == result + mock_client.return_value.delete_workflow.assert_called_once_with( + request=dict(name=WORKFLOW_NAME), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_workflows_client")) + def test_list_workflows(self, mock_client): + result = self.hook.list_workflows( + location=LOCATION, + project_id=PROJECT_ID, + filter_=FILTER_, + order_by=ORDER_BY, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.list_workflows.return_value == result + mock_client.return_value.list_workflows.assert_called_once_with( + request=dict( + parent=WORKFLOW_PARENT, + filter=FILTER_, + order_by=ORDER_BY, + ), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client")) + def test_create_execution(self, mock_client): + result = self.hook.create_execution( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + execution=EXECUTION, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.create_execution.return_value == result + mock_client.return_value.create_execution.assert_called_once_with( + request=dict( + parent=EXECUTION_PARENT, + execution=EXECUTION, + ), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client")) + def test_get_execution(self, mock_client): + result = self.hook.get_execution( + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.get_execution.return_value == result + mock_client.return_value.get_execution.assert_called_once_with( + request=dict(name=EXECUTION_NAME), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client")) + def test_cancel_execution(self, mock_client): + result = self.hook.cancel_execution( + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.cancel_execution.return_value == result + mock_client.return_value.cancel_execution.assert_called_once_with( + request=dict(name=EXECUTION_NAME), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + @mock.patch(BASE_PATH.format("WorkflowsHook.get_executions_client")) + def test_list_execution(self, mock_client): + result = self.hook.list_executions( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert mock_client.return_value.list_executions.return_value == result + mock_client.return_value.list_executions.assert_called_once_with( + request=dict(parent=EXECUTION_PARENT), + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py new file mode 100644 index 0000000..5578548 --- /dev/null +++ b/tests/providers/google/cloud/operators/test_workflows.py @@ -0,0 +1,383 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +from unittest import mock + +import pytz + +from airflow.providers.google.cloud.operators.workflows import ( + WorkflowsCancelExecutionOperator, + WorkflowsCreateExecutionOperator, + WorkflowsCreateWorkflowOperator, + WorkflowsDeleteWorkflowOperator, + WorkflowsGetExecutionOperator, + WorkflowsGetWorkflowOperator, + WorkflowsListExecutionsOperator, + WorkflowsListWorkflowsOperator, + WorkflowsUpdateWorkflowOperator, +) + +BASE_PATH = "airflow.providers.google.cloud.operators.workflows.{}" +LOCATION = "europe-west1" +WORKFLOW_ID = "workflow_id" +EXECUTION_ID = "execution_id" +WORKFLOW = {"aa": "bb"} +EXECUTION = {"ccc": "ddd"} +PROJECT_ID = "airflow-testing" +METADATA = None +TIMEOUT = None +RETRY = None +FILTER_ = "aaaa" +ORDER_BY = "bbb" +UPDATE_MASK = "aaa,bbb" +GCP_CONN_ID = "test-conn" +IMPERSONATION_CHAIN = None + + +class TestWorkflowsCreateWorkflowOperator: + @mock.patch(BASE_PATH.format("Workflow")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + op = WorkflowsCreateWorkflowOperator( + task_id="test_task", + workflow=WORKFLOW, + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.create_workflow.assert_called_once_with( + workflow=WORKFLOW, + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == mock_object.to_dict.return_value + + +class TestWorkflowsUpdateWorkflowOperator: + @mock.patch(BASE_PATH.format("Workflow")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + op = WorkflowsUpdateWorkflowOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + update_mask=UPDATE_MASK, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.get_workflow.assert_called_once_with( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + mock_hook.return_value.update_workflow.assert_called_once_with( + workflow=mock_hook.return_value.get_workflow.return_value, + update_mask=UPDATE_MASK, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == mock_object.to_dict.return_value + + +class TestWorkflowsDeleteWorkflowOperator: + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute( + self, + mock_hook, + ): + op = WorkflowsDeleteWorkflowOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.delete_workflow.assert_called_once_with( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + +class TestWorkflowsListWorkflowsOperator: + @mock.patch(BASE_PATH.format("Workflow")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + workflow_mock = mock.MagicMock() + workflow_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5) + mock_hook.return_value.list_workflows.return_value = [workflow_mock] + + op = WorkflowsListWorkflowsOperator( + task_id="test_task", + location=LOCATION, + project_id=PROJECT_ID, + filter_=FILTER_, + order_by=ORDER_BY, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.list_workflows.assert_called_once_with( + location=LOCATION, + project_id=PROJECT_ID, + filter_=FILTER_, + order_by=ORDER_BY, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == [mock_object.to_dict.return_value] + + +class TestWorkflowsGetWorkflowOperator: + @mock.patch(BASE_PATH.format("Workflow")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + op = WorkflowsGetWorkflowOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.get_workflow.assert_called_once_with( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == mock_object.to_dict.return_value + + +class TestWorkflowExecutionsCreateExecutionOperator: + @mock.patch(BASE_PATH.format("Execution")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + @mock.patch(BASE_PATH.format("WorkflowsCreateExecutionOperator.xcom_push")) + def test_execute(self, mock_xcom, mock_hook, mock_object): + mock_hook.return_value.create_execution.return_value.name = "name/execution_id" + op = WorkflowsCreateExecutionOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution=EXECUTION, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.create_execution.assert_called_once_with( + workflow_id=WORKFLOW_ID, + execution=EXECUTION, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id") + assert result == mock_object.to_dict.return_value + + +class TestWorkflowExecutionsCancelExecutionOperator: + @mock.patch(BASE_PATH.format("Execution")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + op = WorkflowsCancelExecutionOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.cancel_execution.assert_called_once_with( + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == mock_object.to_dict.return_value + + +class TestWorkflowExecutionsListExecutionsOperator: + @mock.patch(BASE_PATH.format("Execution")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + execution_mock = mock.MagicMock() + execution_mock.start_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(minutes=5) + mock_hook.return_value.list_executions.return_value = [execution_mock] + + op = WorkflowsListExecutionsOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.list_executions.assert_called_once_with( + workflow_id=WORKFLOW_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == [mock_object.to_dict.return_value] + + +class TestWorkflowExecutionsGetExecutionOperator: + @mock.patch(BASE_PATH.format("Execution")) + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_execute(self, mock_hook, mock_object): + op = WorkflowsGetExecutionOperator( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.execute({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.get_execution.assert_called_once_with( + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result == mock_object.to_dict.return_value diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py new file mode 100644 index 0000000..0a768ed --- /dev/null +++ b/tests/providers/google/cloud/operators/test_workflows_system.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY +from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context + + +@pytest.mark.system("google.cloud") +@pytest.mark.credential_file(GCP_WORKFLOWS_KEY) +class CloudVisionExampleDagsSystemTest(GoogleSystemTest): + @provide_gcp_context(GCP_WORKFLOWS_KEY) + def test_run_example_workflow_dag(self): + self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER) diff --git a/tests/providers/google/cloud/sensors/test_workflows.py b/tests/providers/google/cloud/sensors/test_workflows.py new file mode 100644 index 0000000..56ad958 --- /dev/null +++ b/tests/providers/google/cloud/sensors/test_workflows.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +import pytest +from google.cloud.workflows.executions_v1beta import Execution + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor + +BASE_PATH = "airflow.providers.google.cloud.sensors.workflows.{}" +LOCATION = "europe-west1" +WORKFLOW_ID = "workflow_id" +EXECUTION_ID = "execution_id" +PROJECT_ID = "airflow-testing" +METADATA = None +TIMEOUT = None +RETRY = None +GCP_CONN_ID = "test-conn" +IMPERSONATION_CHAIN = None + + +class TestWorkflowExecutionSensor: + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_poke_success(self, mock_hook): + mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.SUCCEEDED) + op = WorkflowExecutionSensor( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + request_timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.poke({}) + + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + mock_hook.return_value.get_execution.assert_called_once_with( + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + timeout=TIMEOUT, + metadata=METADATA, + ) + + assert result is True + + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_poke_wait(self, mock_hook): + mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.ACTIVE) + op = WorkflowExecutionSensor( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + request_timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + result = op.poke({}) + + assert result is False + + @mock.patch(BASE_PATH.format("WorkflowsHook")) + def test_poke_failure(self, mock_hook): + mock_hook.return_value.get_execution.return_value = mock.MagicMock(state=Execution.State.FAILED) + op = WorkflowExecutionSensor( + task_id="test_task", + workflow_id=WORKFLOW_ID, + execution_id=EXECUTION_ID, + location=LOCATION, + project_id=PROJECT_ID, + retry=RETRY, + request_timeout=TIMEOUT, + metadata=METADATA, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + with pytest.raises(AirflowException): + op.poke({}) diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py index 8aa55b8..8cf222e 100644 --- a/tests/providers/google/cloud/utils/gcp_authenticator.py +++ b/tests/providers/google/cloud/utils/gcp_authenticator.py @@ -54,6 +54,7 @@ GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json' GCP_SPANNER_KEY = 'gcp_spanner.json' GCP_STACKDRIVER = 'gcp_stackdriver.json' GCP_TASKS_KEY = 'gcp_tasks.json' +GCP_WORKFLOWS_KEY = "gcp_workflows.json" GMP_KEY = 'gmp.json' G_FIREBASE_KEY = 'g_firebase.json' GCP_AWS_KEY = 'gcp_aws.json'