This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c88e746494 Dynamic setting up of artifact versinos for Datafusion 
pipelines (#34068)
c88e746494 is described below

commit c88e746494a0ccc718687fe230b02390309c0ea7
Author: max <[email protected]>
AuthorDate: Mon Sep 4 11:30:49 2023 +0200

    Dynamic setting up of artifact versinos for Datafusion pipelines (#34068)
---
 airflow/providers/google/cloud/hooks/datafusion.py |  21 +-
 .../operators/cloud/datafusion.rst                 |   2 +-
 .../google/cloud/hooks/test_datafusion.py          |  25 +-
 .../google/cloud/datafusion/example_datafusion.py  |  40 ++-
 .../cloud/datafusion/example_datafusion_async.py   | 286 ---------------------
 5 files changed, 80 insertions(+), 294 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index dcf51357c6..54ae768f15 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -153,7 +153,7 @@ class DataFusionHook(GoogleBaseHook):
         return os.path.join(instance_url, "v3", "namespaces", 
quote(namespace), "apps")
 
     def _cdap_request(
-        self, url: str, method: str, body: list | dict | None = None
+        self, url: str, method: str, body: list | dict | None = None, params: 
dict | None = None
     ) -> google.auth.transport.Response:
         headers: dict[str, str] = {"Content-Type": "application/json"}
         request = google.auth.transport.requests.Request()
@@ -163,7 +163,7 @@ class DataFusionHook(GoogleBaseHook):
 
         payload = json.dumps(body) if body else None
 
-        response = request(method=method, url=url, headers=headers, 
body=payload)
+        response = request(method=method, url=url, headers=headers, 
body=payload, params=params)
         return response
 
     @staticmethod
@@ -282,6 +282,23 @@ class DataFusionHook(GoogleBaseHook):
         )
         return instance
 
+    def get_instance_artifacts(
+        self, instance_url: str, namespace: str = "default", scope: str = 
"SYSTEM"
+    ) -> Any:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            quote(namespace),
+            "artifacts",
+        )
+        response = self._cdap_request(url=url, method="GET", params={"scope": 
scope})
+        self._check_response_status_and_data(
+            response, f"Retrieving an instance artifacts failed with code 
{response.status}"
+        )
+        content = json.loads(response.data)
+        return content
+
     @GoogleBaseHook.fallback_to_default_project_id
     def patch_instance(
         self,
diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst 
b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
index 15b7638289..0a728cdb10 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
@@ -179,7 +179,7 @@ It is not possible to use both asynchronous and deferrable 
parameters at the sam
 Please, check the example of using deferrable mode:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
 
-.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_start_pipeline_def]
diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py 
b/tests/providers/google/cloud/hooks/test_datafusion.py
index 50ea6b19d0..9ee4b9d39e 100644
--- a/tests/providers/google/cloud/hooks/test_datafusion.py
+++ b/tests/providers/google/cloud/hooks/test_datafusion.py
@@ -168,6 +168,24 @@ class TestDataFusionHook:
         assert result == "value"
         method_mock.assert_called_once_with(name=hook._name(PROJECT_ID, 
LOCATION, INSTANCE_NAME))
 
+    @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
+    def test_get_instance_artifacts(self, mock_request, hook):
+        scope = "SYSTEM"
+        artifact = {
+            "name": "test-artifact",
+            "version": "1.2.3",
+            "scope": scope,
+        }
+        mock_request.return_value = mock.MagicMock(status=200, 
data=json.dumps([artifact]))
+
+        hook.get_instance_artifacts(instance_url=INSTANCE_URL, scope=scope)
+
+        mock_request.assert_called_with(
+            url=f"{INSTANCE_URL}/v3/namespaces/default/artifacts",
+            method="GET",
+            params={"scope": scope},
+        )
+
     @mock.patch("google.auth.transport.requests.Request")
     @mock.patch(HOOK_STR.format("DataFusionHook.get_credentials"))
     def test_cdap_request(self, get_credentials_mock, mock_request, hook):
@@ -177,14 +195,17 @@ class TestDataFusionHook:
         request = mock_request.return_value
         request.return_value = mock.MagicMock()
         body = {"data": "value"}
+        params = {"param_key": "param_value"}
 
-        result = hook._cdap_request(url=url, method=method, body=body)
+        result = hook._cdap_request(url=url, method=method, body=body, 
params=params)
         mock_request.assert_called_once_with()
         get_credentials_mock.assert_called_once_with()
         
get_credentials_mock.return_value.before_request.assert_called_once_with(
             request=request, method=method, url=url, headers=headers
         )
-        request.assert_called_once_with(method=method, url=url, 
headers=headers, body=json.dumps(body))
+        request.assert_called_once_with(
+            method=method, url=url, headers=headers, body=json.dumps(body), 
params=params
+        )
         assert result == request.return_value
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4b26639d15..c4926e3780 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -23,6 +23,8 @@ import os
 from datetime import datetime
 
 from airflow import models
+from airflow.decorators import task
+from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
 from airflow.providers.google.cloud.operators.datafusion import (
     CloudDataFusionCreateInstanceOperator,
     CloudDataFusionCreatePipelineOperator,
@@ -61,7 +63,7 @@ PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
-        "version": "6.8.3",
+        "version": "{{ 
task_instance.xcom_pull(task_ids='get_artifacts_versions')['cdap-data-pipeline']
 }}",
         "scope": "SYSTEM",
     },
     "description": "Data Pipeline Application",
@@ -82,7 +84,12 @@ PIPELINE = {
                     "name": "GCSFile",
                     "type": "batchsource",
                     "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
+                    "artifact": {
+                        "name": "google-cloud",
+                        "version": "{{ 
task_instance.xcom_pull(task_ids='get_artifacts_versions')\
+                            ['google-cloud'] }}",
+                        "scope": "SYSTEM",
+                    },
                     "properties": {
                         "project": "auto-detect",
                         "format": "text",
@@ -111,7 +118,12 @@ PIPELINE = {
                     "name": "GCS",
                     "type": "batchsink",
                     "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
+                    "artifact": {
+                        "name": "google-cloud",
+                        "version": "{{ 
task_instance.xcom_pull(task_ids='get_artifacts_versions')\
+                            ['google-cloud'] }}",
+                        "scope": "SYSTEM",
+                    },
                     "properties": {
                         "project": "auto-detect",
                         "suffix": "yyyy-MM-dd-HH-mm",
@@ -147,6 +159,9 @@ PIPELINE = {
 }
 # [END howto_data_fusion_env_variables]
 
+CloudDataFusionCreatePipelineOperator.template_fields += ("pipeline",)
+
+
 with models.DAG(
     DAG_ID,
     start_date=datetime(2021, 1, 1),
@@ -196,6 +211,13 @@ with models.DAG(
     )
     # [END howto_cloud_data_fusion_update_instance_operator]
 
+    @task(task_id="get_artifacts_versions")
+    def get_artifacts_versions(ti) -> dict:
+        hook = DataFusionHook()
+        instance_url = ti.xcom_pull(task_ids="get_instance", 
key="return_value")["apiEndpoint"]
+        artifacts = hook.get_instance_artifacts(instance_url=instance_url, 
namespace="default")
+        return {item["name"]: item["version"] for item in artifacts}
+
     # [START howto_cloud_data_fusion_create_pipeline]
     create_pipeline = CloudDataFusionCreatePipelineOperator(
         location=LOCATION,
@@ -221,6 +243,16 @@ with models.DAG(
     )
     # [END howto_cloud_data_fusion_start_pipeline]
 
+    # [START howto_cloud_data_fusion_start_pipeline_def]
+    start_pipeline_def = CloudDataFusionStartPipelineOperator(
+        location=LOCATION,
+        pipeline_name=PIPELINE_NAME,
+        instance_name=INSTANCE_NAME,
+        task_id="start_pipeline_def",
+        deferrable=True,
+    )
+    # [END howto_cloud_data_fusion_start_pipeline_def]
+
     # [START howto_cloud_data_fusion_start_pipeline_async]
     start_pipeline_async = CloudDataFusionStartPipelineOperator(
         location=LOCATION,
@@ -284,10 +316,12 @@ with models.DAG(
         # TEST BODY
         >> create_instance
         >> get_instance
+        >> get_artifacts_versions()
         >> restart_instance
         >> update_instance
         >> create_pipeline
         >> list_pipelines
+        >> start_pipeline_def
         >> start_pipeline_async
         >> start_pipeline_sensor
         >> start_pipeline
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
deleted file mode 100644
index 2606d30ab8..0000000000
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example Airflow DAG that shows how to use DataFusion.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.providers.google.cloud.operators.datafusion import (
-    CloudDataFusionCreateInstanceOperator,
-    CloudDataFusionCreatePipelineOperator,
-    CloudDataFusionDeleteInstanceOperator,
-    CloudDataFusionDeletePipelineOperator,
-    CloudDataFusionGetInstanceOperator,
-    CloudDataFusionListPipelinesOperator,
-    CloudDataFusionRestartInstanceOperator,
-    CloudDataFusionStartPipelineOperator,
-    CloudDataFusionStopPipelineOperator,
-    CloudDataFusionUpdateInstanceOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-# [START howto_data_fusion_env_variables]
-SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_datafusion_async"
-LOCATION = "europe-north1"
-INSTANCE_NAME = f"async-df-{ENV_ID}".replace("_", "-")
-INSTANCE = {
-    "type": "BASIC",
-    "displayName": INSTANCE_NAME,
-    "dataprocServiceAccount": SERVICE_ACCOUNT,
-}
-
-BUCKET_NAME_1 = f"a-bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
-BUCKET_NAME_2 = f"a-bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
-BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
-BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
-
-PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
-PIPELINE = {
-    "artifact": {
-        "name": "cdap-data-pipeline",
-        "version": "6.8.3",
-        "scope": "SYSTEM",
-    },
-    "description": "Data Pipeline Application",
-    "name": PIPELINE_NAME,
-    "config": {
-        "resources": {"memoryMB": 2048, "virtualCores": 1},
-        "driverResources": {"memoryMB": 2048, "virtualCores": 1},
-        "connections": [{"from": "GCS", "to": "GCS2"}],
-        "comments": [],
-        "postActions": [],
-        "properties": {},
-        "processTimingEnabled": "true",
-        "stageLoggingEnabled": "false",
-        "stages": [
-            {
-                "name": "GCS",
-                "plugin": {
-                    "name": "GCSFile",
-                    "type": "batchsource",
-                    "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
-                    "properties": {
-                        "project": "auto-detect",
-                        "format": "text",
-                        "skipHeader": "false",
-                        "serviceFilePath": "auto-detect",
-                        "filenameOnly": "false",
-                        "recursive": "false",
-                        "encrypted": "false",
-                        "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
-                            
:"offset","type":"long"},{"name":"body","type":"string"}]}',
-                        "path": BUCKET_NAME_1_URI,
-                        "referenceName": "foo_bucket",
-                        "useConnection": "false",
-                        "serviceAccountType": "filePath",
-                        "sampleSize": "1000",
-                        "fileEncoding": "UTF-8",
-                    },
-                },
-                "outputSchema": '{"type":"record","name":"textfile","fields"\
-                    
:[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
-                "id": "GCS",
-            },
-            {
-                "name": "GCS2",
-                "plugin": {
-                    "name": "GCS",
-                    "type": "batchsink",
-                    "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
-                    "properties": {
-                        "project": "auto-detect",
-                        "suffix": "yyyy-MM-dd-HH-mm",
-                        "format": "json",
-                        "serviceFilePath": "auto-detect",
-                        "location": "us",
-                        "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
-                            
:"offset","type":"long"},{"name":"body","type":"string"}]}',
-                        "referenceName": "bar",
-                        "path": BUCKET_NAME_2_URI,
-                        "serviceAccountType": "filePath",
-                        "contentType": "application/octet-stream",
-                    },
-                },
-                "outputSchema": '{"type":"record","name":"textfile","fields"\
-                    
:[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
-                "inputSchema": [
-                    {
-                        "name": "GCS",
-                        "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
-                            
:"offset","type":"long"},{"name":"body","type":"string"}]}',
-                    }
-                ],
-                "id": "GCS2",
-            },
-        ],
-        "schedule": "0 * * * *",
-        "engine": "spark",
-        "numOfRecordsPreview": 100,
-        "description": "Data Pipeline Application",
-        "maxConcurrentRuns": 1,
-    },
-}
-# [END howto_data_fusion_env_variables]
-
-with models.DAG(
-    DAG_ID,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example", "datafusion", "deferrable"],
-) as dag:
-    create_bucket1 = GCSCreateBucketOperator(
-        task_id="create_bucket1",
-        bucket_name=BUCKET_NAME_1,
-        project_id=PROJECT_ID,
-    )
-
-    create_bucket2 = GCSCreateBucketOperator(
-        task_id="create_bucket2",
-        bucket_name=BUCKET_NAME_2,
-        project_id=PROJECT_ID,
-    )
-
-    # [START howto_cloud_data_fusion_create_instance_operator]
-    create_instance = CloudDataFusionCreateInstanceOperator(
-        location=LOCATION,
-        instance_name=INSTANCE_NAME,
-        instance=INSTANCE,
-        task_id="create_instance",
-    )
-    # [END howto_cloud_data_fusion_create_instance_operator]
-
-    # [START howto_cloud_data_fusion_get_instance_operator]
-    get_instance = CloudDataFusionGetInstanceOperator(
-        location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
-    )
-    # [END howto_cloud_data_fusion_get_instance_operator]
-
-    # [START howto_cloud_data_fusion_restart_instance_operator]
-    restart_instance = CloudDataFusionRestartInstanceOperator(
-        location=LOCATION, instance_name=INSTANCE_NAME, 
task_id="restart_instance"
-    )
-    # [END howto_cloud_data_fusion_restart_instance_operator]
-
-    # [START howto_cloud_data_fusion_update_instance_operator]
-    update_instance = CloudDataFusionUpdateInstanceOperator(
-        location=LOCATION,
-        instance_name=INSTANCE_NAME,
-        instance=INSTANCE,
-        update_mask="",
-        task_id="update_instance",
-    )
-    # [END howto_cloud_data_fusion_update_instance_operator]
-
-    # [START howto_cloud_data_fusion_create_pipeline]
-    create_pipeline = CloudDataFusionCreatePipelineOperator(
-        location=LOCATION,
-        pipeline_name=PIPELINE_NAME,
-        pipeline=PIPELINE,
-        instance_name=INSTANCE_NAME,
-        task_id="create_pipeline",
-    )
-    # [END howto_cloud_data_fusion_create_pipeline]
-
-    # [START howto_cloud_data_fusion_list_pipelines]
-    list_pipelines = CloudDataFusionListPipelinesOperator(
-        location=LOCATION, instance_name=INSTANCE_NAME, 
task_id="list_pipelines"
-    )
-    # [END howto_cloud_data_fusion_list_pipelines]
-
-    # [START howto_cloud_data_fusion_start_pipeline_def]
-    start_pipeline_def = CloudDataFusionStartPipelineOperator(
-        location=LOCATION,
-        pipeline_name=PIPELINE_NAME,
-        instance_name=INSTANCE_NAME,
-        deferrable=True,
-        task_id="start_pipeline_def",
-    )
-    # [END howto_cloud_data_fusion_start_pipeline_def]
-
-    # [START howto_cloud_data_fusion_stop_pipeline]
-    stop_pipeline = CloudDataFusionStopPipelineOperator(
-        location=LOCATION,
-        pipeline_name=PIPELINE_NAME,
-        instance_name=INSTANCE_NAME,
-        task_id="stop_pipeline",
-    )
-    # [END howto_cloud_data_fusion_stop_pipeline]
-
-    # [START howto_cloud_data_fusion_delete_pipeline]
-    delete_pipeline = CloudDataFusionDeletePipelineOperator(
-        location=LOCATION,
-        pipeline_name=PIPELINE_NAME,
-        instance_name=INSTANCE_NAME,
-        task_id="delete_pipeline",
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-    # [END howto_cloud_data_fusion_delete_pipeline]
-
-    # [START howto_cloud_data_fusion_delete_instance_operator]
-    delete_instance = CloudDataFusionDeleteInstanceOperator(
-        location=LOCATION,
-        instance_name=INSTANCE_NAME,
-        task_id="delete_instance",
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-    # [END howto_cloud_data_fusion_delete_instance_operator]
-
-    delete_bucket1 = GCSDeleteBucketOperator(
-        task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
-    )
-    delete_bucket2 = GCSDeleteBucketOperator(
-        task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
-    )
-
-    (
-        # TEST SETUP
-        [create_bucket1, create_bucket2]
-        # TEST BODY
-        >> create_instance
-        >> get_instance
-        >> restart_instance
-        >> update_instance
-        >> create_pipeline
-        >> list_pipelines
-        >> start_pipeline_def
-        >> stop_pipeline
-        >> delete_pipeline
-        >> delete_instance
-        # TEST TEARDOWN
-        >> [delete_bucket1, delete_bucket2]
-    )
-
-    from tests.system.utils.watcher import watcher
-
-# This test needs watcher in order to properly mark success/failure
-# when "tearDown" task with trigger rule is part of the DAG
-list(dag.tasks) >> watcher()
-
-from tests.system.utils import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)

Reply via email to