This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e72cfd7d9e8 Add CloudComposerTriggerDAGRunOperator for Cloud Composer
service (#55256)
e72cfd7d9e8 is described below
commit e72cfd7d9e8b73fb5bdad9c0a0bcc83fa0cad6a3
Author: Maksim <[email protected]>
AuthorDate: Fri Sep 5 22:50:52 2025 +0200
Add CloudComposerTriggerDAGRunOperator for Cloud Composer service (#55256)
---
.../google/docs/operators/cloud/cloud_composer.rst | 12 +++
.../providers/google/cloud/hooks/cloud_composer.py | 66 ++++++++++++++++-
.../google/cloud/operators/cloud_composer.py | 85 +++++++++++++++++++++-
.../cloud/composer/example_cloud_composer.py | 12 +++
.../unit/google/cloud/hooks/test_cloud_composer.py | 26 +++++++
.../google/cloud/operators/test_cloud_composer.py | 36 +++++++++
6 files changed, 235 insertions(+), 2 deletions(-)
diff --git a/providers/google/docs/operators/cloud/cloud_composer.rst
b/providers/google/docs/operators/cloud/cloud_composer.rst
index cd0ada3c38f..88381b48438 100644
--- a/providers/google/docs/operators/cloud/cloud_composer.rst
+++ b/providers/google/docs/operators/cloud/cloud_composer.rst
@@ -197,3 +197,15 @@ or you can define the same sensor in the deferrable mode:
:dedent: 4
:start-after: [START howto_sensor_dag_run_deferrable_mode]
:end-before: [END howto_sensor_dag_run_deferrable_mode]
+
+Trigger a DAG run
+-----------------
+
+You can trigger a DAG in another Composer environment, use:
+:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerTriggerDAGRunOperator`
+
+.. exampleinclude::
/../../google/tests/system/google/cloud/composer/example_cloud_composer.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_trigger_dag_run]
+ :end-before: [END howto_operator_trigger_dag_run]
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
index 5026b63005e..9c963e47b0f 100644
---
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
+++
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -18,12 +18,15 @@
from __future__ import annotations
import asyncio
+import json
import time
from collections.abc import MutableSequence, Sequence
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urljoin
from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.auth.transport.requests import AuthorizedSession
from google.cloud.orchestration.airflow.service_v1 import (
EnvironmentsAsyncClient,
EnvironmentsClient,
@@ -76,6 +79,34 @@ class CloudComposerHook(GoogleBaseHook, OperationHelper):
client_options=self.client_options,
)
+ def make_composer_airflow_api_request(
+ self,
+ method: str,
+ airflow_uri: str,
+ path: str,
+ data: Any | None = None,
+ timeout: float | None = None,
+ ):
+ """
+ Make a request to Cloud Composer environment's web server.
+
+ :param method: The request method to use ('GET', 'OPTIONS', 'HEAD',
'POST', 'PUT', 'PATCH', 'DELETE').
+ :param airflow_uri: The URI of the Apache Airflow Web UI hosted within
this environment.
+ :param path: The path to send the request.
+ :param data: Dictionary, list of tuples, bytes, or file-like object to
send in the body of the request.
+ :param timeout: The timeout for this request.
+ """
+ authed_session = AuthorizedSession(self.get_credentials())
+
+ resp = authed_session.request(
+ method=method,
+ url=urljoin(airflow_uri, path),
+ data=data,
+ headers={"Content-Type": "application/json"},
+ timeout=timeout,
+ )
+ return resp
+
def get_operation(self, operation_name):
return
self.get_environment_client().transport.operations_client.get_operation(name=operation_name)
@@ -408,6 +439,39 @@ class CloudComposerHook(GoogleBaseHook, OperationHelper):
self.log.info("Waiting for result...")
time.sleep(poll_interval)
+ def trigger_dag_run(
+ self,
+ composer_airflow_uri: str,
+ composer_dag_id: str,
+ composer_dag_conf: dict | None = None,
+ timeout: float | None = None,
+ ) -> dict:
+ """
+ Trigger DAG run for provided Apache Airflow Web UI hosted within
Composer environment.
+
+ :param composer_airflow_uri: The URI of the Apache Airflow Web UI
hosted within Composer environment.
+ :param composer_dag_id: The ID of DAG which will be triggered.
+ :param composer_dag_conf: Configuration parameters for the DAG run.
+ :param timeout: The timeout for this request.
+ """
+ response = self.make_composer_airflow_api_request(
+ method="POST",
+ airflow_uri=composer_airflow_uri,
+ path=f"/api/v1/dags/{composer_dag_id}/dagRuns",
+ data=json.dumps(
+ {
+ "conf": composer_dag_conf or {},
+ }
+ ),
+ timeout=timeout,
+ )
+
+ if response.status_code != 200:
+ self.log.error(response.text)
+ response.raise_for_status()
+
+ return response.json()
+
class CloudComposerAsyncHook(GoogleBaseHook):
"""Hook for Google Cloud Composer async APIs."""
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
index 8d5a13561d4..5783dd816ad 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -21,7 +21,7 @@ import shlex
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, NotFound
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.orchestration.airflow.service_v1 import ImageVersion
from google.cloud.orchestration.airflow.service_v1.types import Environment,
ExecuteAirflowCommandResponse
@@ -798,3 +798,86 @@ class
CloudComposerRunAirflowCLICommandOperator(GoogleCloudBaseOperator):
"""Merge output to one string."""
result_str = "\n".join(line_dict["content"] for line_dict in
result["output"])
return result_str
+
+
+class CloudComposerTriggerDAGRunOperator(GoogleCloudBaseOperator):
+ """
+ Trigger DAG run for provided Composer environment.
+
+ :param project_id: The ID of the Google Cloud project that the service
belongs to.
+ :param region: The ID of the Google Cloud region that the service belongs
to.
+ :param environment_id: The ID of the Google Cloud environment that the
service belongs to.
+ :param composer_dag_id: The ID of DAG which will be triggered.
+ :param composer_dag_conf: Configuration parameters for the DAG run.
+ :param timeout: The timeout for this request.
+ :param gcp_conn_id: The connection ID used to connect to Google Cloud
Platform.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = (
+ "project_id",
+ "region",
+ "environment_id",
+ "composer_dag_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ environment_id: str,
+ composer_dag_id: str,
+ composer_dag_conf: dict | None = None,
+ timeout: float | None = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.environment_id = environment_id
+ self.composer_dag_id = composer_dag_id
+ self.composer_dag_conf = composer_dag_conf or {}
+ self.timeout = timeout
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = CloudComposerHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ try:
+ environment = hook.get_environment(
+ project_id=self.project_id,
+ region=self.region,
+ environment_id=self.environment_id,
+ timeout=self.timeout,
+ )
+ except NotFound as not_found_err:
+ self.log.info("The Composer environment %s does not exist.",
self.environment_id)
+ raise AirflowException(not_found_err)
+ composer_airflow_uri = environment.config.airflow_uri
+
+ self.log.info(
+ "Triggering the DAG %s on the %s environment...",
self.composer_dag_id, self.environment_id
+ )
+ dag_run = hook.trigger_dag_run(
+ composer_airflow_uri=composer_airflow_uri,
+ composer_dag_id=self.composer_dag_id,
+ composer_dag_conf=self.composer_dag_conf,
+ timeout=self.timeout,
+ )
+ self.log.info("The DAG %s was triggered with Run ID: %s",
self.composer_dag_id, dag_run["dag_run_id"])
+
+ return dag_run
diff --git
a/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
b/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
index 48cccbaedb1..21e635a3853 100644
---
a/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
+++
b/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
@@ -40,6 +40,7 @@ from airflow.providers.google.cloud.operators.cloud_composer
import (
CloudComposerListEnvironmentsOperator,
CloudComposerListImageVersionsOperator,
CloudComposerRunAirflowCLICommandOperator,
+ CloudComposerTriggerDAGRunOperator,
CloudComposerUpdateEnvironmentOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import
CloudComposerDAGRunSensor
@@ -218,6 +219,16 @@ with DAG(
)
# [END howto_sensor_dag_run_deferrable_mode]
+ # [START howto_operator_trigger_dag_run]
+ trigger_dag_run = CloudComposerTriggerDAGRunOperator(
+ task_id="trigger_dag_run",
+ project_id=PROJECT_ID,
+ region=REGION,
+ environment_id=ENVIRONMENT_ID,
+ composer_dag_id="airflow_monitoring",
+ )
+ # [END howto_operator_trigger_dag_run]
+
# [START howto_operator_delete_composer_environment]
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
@@ -250,6 +261,7 @@ with DAG(
[update_env, defer_update_env],
[run_airflow_cli_cmd, defer_run_airflow_cli_cmd],
[dag_run_sensor, defer_dag_run_sensor],
+ trigger_dag_run,
# TEST TEARDOWN
[delete_env, defer_delete_env],
)
diff --git
a/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
b/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
index a7ef0e347fe..4a371794793 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import json
from unittest import mock
from unittest.mock import AsyncMock
@@ -56,6 +57,10 @@ TEST_METADATA = [("key", "value")]
TEST_PARENT = "test-parent"
TEST_NAME = "test-name"
+TEST_COMPOSER_AIRFLOW_URI = "test-composer-airflow-uri"
+TEST_COMPOSER_DAG_ID = "test-composer-dag-id"
+TEST_COMPOSER_DAG_CONF = {"test-key": "test-value"}
+
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
COMPOSER_STRING = "airflow.providers.google.cloud.hooks.cloud_composer.{}"
@@ -257,6 +262,27 @@ class TestCloudComposerHook:
metadata=TEST_METADATA,
)
+
@mock.patch(COMPOSER_STRING.format("CloudComposerHook.make_composer_airflow_api_request"))
+ def test_trigger_dag_run(self, mock_composer_airflow_api_request) -> None:
+ self.hook.get_credentials = mock.MagicMock()
+ self.hook.trigger_dag_run(
+ composer_airflow_uri=TEST_COMPOSER_AIRFLOW_URI,
+ composer_dag_id=TEST_COMPOSER_DAG_ID,
+ composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+ timeout=TEST_TIMEOUT,
+ )
+ mock_composer_airflow_api_request.assert_called_once_with(
+ method="POST",
+ airflow_uri=TEST_COMPOSER_AIRFLOW_URI,
+ path=f"/api/v1/dags/{TEST_COMPOSER_DAG_ID}/dagRuns",
+ data=json.dumps(
+ {
+ "conf": TEST_COMPOSER_DAG_CONF,
+ }
+ ),
+ timeout=TEST_TIMEOUT,
+ )
+
class TestCloudComposerAsyncHook:
def setup_method(self, method):
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
index e6da6cf5cf9..0ca1738961c 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
@@ -29,6 +29,7 @@ from airflow.providers.google.cloud.operators.cloud_composer
import (
CloudComposerListEnvironmentsOperator,
CloudComposerListImageVersionsOperator,
CloudComposerRunAirflowCLICommandOperator,
+ CloudComposerTriggerDAGRunOperator,
CloudComposerUpdateEnvironmentOperator,
)
from airflow.providers.google.cloud.triggers.cloud_composer import (
@@ -67,6 +68,9 @@ TEST_METADATA = [("key", "value")]
TEST_PARENT = "test-parent"
TEST_NAME = "test-name"
+TEST_COMPOSER_DAG_ID = "test-composer-dag-id"
+TEST_COMPOSER_DAG_CONF = {"test-key": "test-value"}
+
COMPOSER_STRING = "airflow.providers.google.cloud.operators.cloud_composer.{}"
COMPOSER_TRIGGERS_STRING =
"airflow.providers.google.cloud.triggers.cloud_composer.{}"
@@ -375,3 +379,35 @@ class TestCloudComposerRunAirflowCLICommandOperator:
assert isinstance(exc.value.trigger,
CloudComposerAirflowCLICommandTrigger)
assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
+
+
+class TestCloudComposerTriggerDAGRunOperator:
+ @mock.patch(COMPOSER_STRING.format("CloudComposerHook"))
+ def test_execute(self, mock_hook) -> None:
+ op = CloudComposerTriggerDAGRunOperator(
+ task_id=TASK_ID,
+ project_id=TEST_GCP_PROJECT,
+ region=TEST_GCP_REGION,
+ environment_id=TEST_ENVIRONMENT_ID,
+ composer_dag_id=TEST_COMPOSER_DAG_ID,
+ composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ timeout=TEST_TIMEOUT,
+ )
+ op.execute(mock.MagicMock())
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.get_environment.assert_called_once_with(
+ project_id=TEST_GCP_PROJECT,
+ region=TEST_GCP_REGION,
+ environment_id=TEST_ENVIRONMENT_ID,
+ timeout=TEST_TIMEOUT,
+ )
+ mock_hook.return_value.trigger_dag_run.assert_called_once_with(
+
composer_airflow_uri=mock_hook.return_value.get_environment.return_value.config.airflow_uri,
+ composer_dag_id=TEST_COMPOSER_DAG_ID,
+ composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+ timeout=TEST_TIMEOUT,
+ )