This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c88e746494 Dynamic setting up of artifact versinos for Datafusion
pipelines (#34068)
c88e746494 is described below
commit c88e746494a0ccc718687fe230b02390309c0ea7
Author: max <[email protected]>
AuthorDate: Mon Sep 4 11:30:49 2023 +0200
Dynamic setting up of artifact versinos for Datafusion pipelines (#34068)
---
airflow/providers/google/cloud/hooks/datafusion.py | 21 +-
.../operators/cloud/datafusion.rst | 2 +-
.../google/cloud/hooks/test_datafusion.py | 25 +-
.../google/cloud/datafusion/example_datafusion.py | 40 ++-
.../cloud/datafusion/example_datafusion_async.py | 286 ---------------------
5 files changed, 80 insertions(+), 294 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index dcf51357c6..54ae768f15 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -153,7 +153,7 @@ class DataFusionHook(GoogleBaseHook):
return os.path.join(instance_url, "v3", "namespaces",
quote(namespace), "apps")
def _cdap_request(
- self, url: str, method: str, body: list | dict | None = None
+ self, url: str, method: str, body: list | dict | None = None, params:
dict | None = None
) -> google.auth.transport.Response:
headers: dict[str, str] = {"Content-Type": "application/json"}
request = google.auth.transport.requests.Request()
@@ -163,7 +163,7 @@ class DataFusionHook(GoogleBaseHook):
payload = json.dumps(body) if body else None
- response = request(method=method, url=url, headers=headers,
body=payload)
+ response = request(method=method, url=url, headers=headers,
body=payload, params=params)
return response
@staticmethod
@@ -282,6 +282,23 @@ class DataFusionHook(GoogleBaseHook):
)
return instance
+ def get_instance_artifacts(
+ self, instance_url: str, namespace: str = "default", scope: str =
"SYSTEM"
+ ) -> Any:
+ url = os.path.join(
+ instance_url,
+ "v3",
+ "namespaces",
+ quote(namespace),
+ "artifacts",
+ )
+ response = self._cdap_request(url=url, method="GET", params={"scope":
scope})
+ self._check_response_status_and_data(
+ response, f"Retrieving an instance artifacts failed with code
{response.status}"
+ )
+ content = json.loads(response.data)
+ return content
+
@GoogleBaseHook.fallback_to_default_project_id
def patch_instance(
self,
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
index 15b7638289..0a728cdb10 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
@@ -179,7 +179,7 @@ It is not possible to use both asynchronous and deferrable
parameters at the sam
Please, check the example of using deferrable mode:
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
-.. exampleinclude::
/../../tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
:language: python
:dedent: 4
:start-after: [START howto_cloud_data_fusion_start_pipeline_def]
diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py
b/tests/providers/google/cloud/hooks/test_datafusion.py
index 50ea6b19d0..9ee4b9d39e 100644
--- a/tests/providers/google/cloud/hooks/test_datafusion.py
+++ b/tests/providers/google/cloud/hooks/test_datafusion.py
@@ -168,6 +168,24 @@ class TestDataFusionHook:
assert result == "value"
method_mock.assert_called_once_with(name=hook._name(PROJECT_ID,
LOCATION, INSTANCE_NAME))
+ @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
+ def test_get_instance_artifacts(self, mock_request, hook):
+ scope = "SYSTEM"
+ artifact = {
+ "name": "test-artifact",
+ "version": "1.2.3",
+ "scope": scope,
+ }
+ mock_request.return_value = mock.MagicMock(status=200,
data=json.dumps([artifact]))
+
+ hook.get_instance_artifacts(instance_url=INSTANCE_URL, scope=scope)
+
+ mock_request.assert_called_with(
+ url=f"{INSTANCE_URL}/v3/namespaces/default/artifacts",
+ method="GET",
+ params={"scope": scope},
+ )
+
@mock.patch("google.auth.transport.requests.Request")
@mock.patch(HOOK_STR.format("DataFusionHook.get_credentials"))
def test_cdap_request(self, get_credentials_mock, mock_request, hook):
@@ -177,14 +195,17 @@ class TestDataFusionHook:
request = mock_request.return_value
request.return_value = mock.MagicMock()
body = {"data": "value"}
+ params = {"param_key": "param_value"}
- result = hook._cdap_request(url=url, method=method, body=body)
+ result = hook._cdap_request(url=url, method=method, body=body,
params=params)
mock_request.assert_called_once_with()
get_credentials_mock.assert_called_once_with()
get_credentials_mock.return_value.before_request.assert_called_once_with(
request=request, method=method, url=url, headers=headers
)
- request.assert_called_once_with(method=method, url=url,
headers=headers, body=json.dumps(body))
+ request.assert_called_once_with(
+ method=method, url=url, headers=headers, body=json.dumps(body),
params=params
+ )
assert result == request.return_value
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4b26639d15..c4926e3780 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -23,6 +23,8 @@ import os
from datetime import datetime
from airflow import models
+from airflow.decorators import task
+from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
@@ -61,7 +63,7 @@ PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
- "version": "6.8.3",
+ "version": "{{
task_instance.xcom_pull(task_ids='get_artifacts_versions')['cdap-data-pipeline']
}}",
"scope": "SYSTEM",
},
"description": "Data Pipeline Application",
@@ -82,7 +84,12 @@ PIPELINE = {
"name": "GCSFile",
"type": "batchsource",
"label": "GCS",
- "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
+ "artifact": {
+ "name": "google-cloud",
+ "version": "{{
task_instance.xcom_pull(task_ids='get_artifacts_versions')\
+ ['google-cloud'] }}",
+ "scope": "SYSTEM",
+ },
"properties": {
"project": "auto-detect",
"format": "text",
@@ -111,7 +118,12 @@ PIPELINE = {
"name": "GCS",
"type": "batchsink",
"label": "GCS2",
- "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
+ "artifact": {
+ "name": "google-cloud",
+ "version": "{{
task_instance.xcom_pull(task_ids='get_artifacts_versions')\
+ ['google-cloud'] }}",
+ "scope": "SYSTEM",
+ },
"properties": {
"project": "auto-detect",
"suffix": "yyyy-MM-dd-HH-mm",
@@ -147,6 +159,9 @@ PIPELINE = {
}
# [END howto_data_fusion_env_variables]
+CloudDataFusionCreatePipelineOperator.template_fields += ("pipeline",)
+
+
with models.DAG(
DAG_ID,
start_date=datetime(2021, 1, 1),
@@ -196,6 +211,13 @@ with models.DAG(
)
# [END howto_cloud_data_fusion_update_instance_operator]
+ @task(task_id="get_artifacts_versions")
+ def get_artifacts_versions(ti) -> dict:
+ hook = DataFusionHook()
+ instance_url = ti.xcom_pull(task_ids="get_instance",
key="return_value")["apiEndpoint"]
+ artifacts = hook.get_instance_artifacts(instance_url=instance_url,
namespace="default")
+ return {item["name"]: item["version"] for item in artifacts}
+
# [START howto_cloud_data_fusion_create_pipeline]
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
@@ -221,6 +243,16 @@ with models.DAG(
)
# [END howto_cloud_data_fusion_start_pipeline]
+ # [START howto_cloud_data_fusion_start_pipeline_def]
+ start_pipeline_def = CloudDataFusionStartPipelineOperator(
+ location=LOCATION,
+ pipeline_name=PIPELINE_NAME,
+ instance_name=INSTANCE_NAME,
+ task_id="start_pipeline_def",
+ deferrable=True,
+ )
+ # [END howto_cloud_data_fusion_start_pipeline_def]
+
# [START howto_cloud_data_fusion_start_pipeline_async]
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
@@ -284,10 +316,12 @@ with models.DAG(
# TEST BODY
>> create_instance
>> get_instance
+ >> get_artifacts_versions()
>> restart_instance
>> update_instance
>> create_pipeline
>> list_pipelines
+ >> start_pipeline_def
>> start_pipeline_async
>> start_pipeline_sensor
>> start_pipeline
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
deleted file mode 100644
index 2606d30ab8..0000000000
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example Airflow DAG that shows how to use DataFusion.
-"""
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow import models
-from airflow.providers.google.cloud.operators.datafusion import (
- CloudDataFusionCreateInstanceOperator,
- CloudDataFusionCreatePipelineOperator,
- CloudDataFusionDeleteInstanceOperator,
- CloudDataFusionDeletePipelineOperator,
- CloudDataFusionGetInstanceOperator,
- CloudDataFusionListPipelinesOperator,
- CloudDataFusionRestartInstanceOperator,
- CloudDataFusionStartPipelineOperator,
- CloudDataFusionStopPipelineOperator,
- CloudDataFusionUpdateInstanceOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-# [START howto_data_fusion_env_variables]
-SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_datafusion_async"
-LOCATION = "europe-north1"
-INSTANCE_NAME = f"async-df-{ENV_ID}".replace("_", "-")
-INSTANCE = {
- "type": "BASIC",
- "displayName": INSTANCE_NAME,
- "dataprocServiceAccount": SERVICE_ACCOUNT,
-}
-
-BUCKET_NAME_1 = f"a-bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-")
-BUCKET_NAME_2 = f"a-bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-")
-BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
-BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
-
-PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-")
-PIPELINE = {
- "artifact": {
- "name": "cdap-data-pipeline",
- "version": "6.8.3",
- "scope": "SYSTEM",
- },
- "description": "Data Pipeline Application",
- "name": PIPELINE_NAME,
- "config": {
- "resources": {"memoryMB": 2048, "virtualCores": 1},
- "driverResources": {"memoryMB": 2048, "virtualCores": 1},
- "connections": [{"from": "GCS", "to": "GCS2"}],
- "comments": [],
- "postActions": [],
- "properties": {},
- "processTimingEnabled": "true",
- "stageLoggingEnabled": "false",
- "stages": [
- {
- "name": "GCS",
- "plugin": {
- "name": "GCSFile",
- "type": "batchsource",
- "label": "GCS",
- "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
- "properties": {
- "project": "auto-detect",
- "format": "text",
- "skipHeader": "false",
- "serviceFilePath": "auto-detect",
- "filenameOnly": "false",
- "recursive": "false",
- "encrypted": "false",
- "schema":
'{"type":"record","name":"textfile","fields":[{"name"\
-
:"offset","type":"long"},{"name":"body","type":"string"}]}',
- "path": BUCKET_NAME_1_URI,
- "referenceName": "foo_bucket",
- "useConnection": "false",
- "serviceAccountType": "filePath",
- "sampleSize": "1000",
- "fileEncoding": "UTF-8",
- },
- },
- "outputSchema": '{"type":"record","name":"textfile","fields"\
-
:[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
- "id": "GCS",
- },
- {
- "name": "GCS2",
- "plugin": {
- "name": "GCS",
- "type": "batchsink",
- "label": "GCS2",
- "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
- "properties": {
- "project": "auto-detect",
- "suffix": "yyyy-MM-dd-HH-mm",
- "format": "json",
- "serviceFilePath": "auto-detect",
- "location": "us",
- "schema":
'{"type":"record","name":"textfile","fields":[{"name"\
-
:"offset","type":"long"},{"name":"body","type":"string"}]}',
- "referenceName": "bar",
- "path": BUCKET_NAME_2_URI,
- "serviceAccountType": "filePath",
- "contentType": "application/octet-stream",
- },
- },
- "outputSchema": '{"type":"record","name":"textfile","fields"\
-
:[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
- "inputSchema": [
- {
- "name": "GCS",
- "schema":
'{"type":"record","name":"textfile","fields":[{"name"\
-
:"offset","type":"long"},{"name":"body","type":"string"}]}',
- }
- ],
- "id": "GCS2",
- },
- ],
- "schedule": "0 * * * *",
- "engine": "spark",
- "numOfRecordsPreview": 100,
- "description": "Data Pipeline Application",
- "maxConcurrentRuns": 1,
- },
-}
-# [END howto_data_fusion_env_variables]
-
-with models.DAG(
- DAG_ID,
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example", "datafusion", "deferrable"],
-) as dag:
- create_bucket1 = GCSCreateBucketOperator(
- task_id="create_bucket1",
- bucket_name=BUCKET_NAME_1,
- project_id=PROJECT_ID,
- )
-
- create_bucket2 = GCSCreateBucketOperator(
- task_id="create_bucket2",
- bucket_name=BUCKET_NAME_2,
- project_id=PROJECT_ID,
- )
-
- # [START howto_cloud_data_fusion_create_instance_operator]
- create_instance = CloudDataFusionCreateInstanceOperator(
- location=LOCATION,
- instance_name=INSTANCE_NAME,
- instance=INSTANCE,
- task_id="create_instance",
- )
- # [END howto_cloud_data_fusion_create_instance_operator]
-
- # [START howto_cloud_data_fusion_get_instance_operator]
- get_instance = CloudDataFusionGetInstanceOperator(
- location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
- )
- # [END howto_cloud_data_fusion_get_instance_operator]
-
- # [START howto_cloud_data_fusion_restart_instance_operator]
- restart_instance = CloudDataFusionRestartInstanceOperator(
- location=LOCATION, instance_name=INSTANCE_NAME,
task_id="restart_instance"
- )
- # [END howto_cloud_data_fusion_restart_instance_operator]
-
- # [START howto_cloud_data_fusion_update_instance_operator]
- update_instance = CloudDataFusionUpdateInstanceOperator(
- location=LOCATION,
- instance_name=INSTANCE_NAME,
- instance=INSTANCE,
- update_mask="",
- task_id="update_instance",
- )
- # [END howto_cloud_data_fusion_update_instance_operator]
-
- # [START howto_cloud_data_fusion_create_pipeline]
- create_pipeline = CloudDataFusionCreatePipelineOperator(
- location=LOCATION,
- pipeline_name=PIPELINE_NAME,
- pipeline=PIPELINE,
- instance_name=INSTANCE_NAME,
- task_id="create_pipeline",
- )
- # [END howto_cloud_data_fusion_create_pipeline]
-
- # [START howto_cloud_data_fusion_list_pipelines]
- list_pipelines = CloudDataFusionListPipelinesOperator(
- location=LOCATION, instance_name=INSTANCE_NAME,
task_id="list_pipelines"
- )
- # [END howto_cloud_data_fusion_list_pipelines]
-
- # [START howto_cloud_data_fusion_start_pipeline_def]
- start_pipeline_def = CloudDataFusionStartPipelineOperator(
- location=LOCATION,
- pipeline_name=PIPELINE_NAME,
- instance_name=INSTANCE_NAME,
- deferrable=True,
- task_id="start_pipeline_def",
- )
- # [END howto_cloud_data_fusion_start_pipeline_def]
-
- # [START howto_cloud_data_fusion_stop_pipeline]
- stop_pipeline = CloudDataFusionStopPipelineOperator(
- location=LOCATION,
- pipeline_name=PIPELINE_NAME,
- instance_name=INSTANCE_NAME,
- task_id="stop_pipeline",
- )
- # [END howto_cloud_data_fusion_stop_pipeline]
-
- # [START howto_cloud_data_fusion_delete_pipeline]
- delete_pipeline = CloudDataFusionDeletePipelineOperator(
- location=LOCATION,
- pipeline_name=PIPELINE_NAME,
- instance_name=INSTANCE_NAME,
- task_id="delete_pipeline",
- trigger_rule=TriggerRule.ALL_DONE,
- )
- # [END howto_cloud_data_fusion_delete_pipeline]
-
- # [START howto_cloud_data_fusion_delete_instance_operator]
- delete_instance = CloudDataFusionDeleteInstanceOperator(
- location=LOCATION,
- instance_name=INSTANCE_NAME,
- task_id="delete_instance",
- trigger_rule=TriggerRule.ALL_DONE,
- )
- # [END howto_cloud_data_fusion_delete_instance_operator]
-
- delete_bucket1 = GCSDeleteBucketOperator(
- task_id="delete_bucket1", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
- )
- delete_bucket2 = GCSDeleteBucketOperator(
- task_id="delete_bucket2", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
- )
-
- (
- # TEST SETUP
- [create_bucket1, create_bucket2]
- # TEST BODY
- >> create_instance
- >> get_instance
- >> restart_instance
- >> update_instance
- >> create_pipeline
- >> list_pipelines
- >> start_pipeline_def
- >> stop_pipeline
- >> delete_pipeline
- >> delete_instance
- # TEST TEARDOWN
- >> [delete_bucket1, delete_bucket2]
- )
-
- from tests.system.utils.watcher import watcher
-
-# This test needs watcher in order to properly mark success/failure
-# when "tearDown" task with trigger rule is part of the DAG
-list(dag.tasks) >> watcher()
-
-from tests.system.utils import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)