This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76f9210e5e0 Propagate AWS hook parameters through
RedshiftClusterTrigger (#68925)
76f9210e5e0 is described below
commit 76f9210e5e00d3e17ca2557e475596f70b382781
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jun 24 10:41:24 2026 -0700
Propagate AWS hook parameters through RedshiftClusterTrigger (#68925)
RedshiftClusterTrigger now accepts and serializes region_name, verify, and
botocore_config and builds its hook with them, and RedshiftClusterSensor
forwards those values when deferring. Previously a deferred
RedshiftClusterSensor
silently dropped the region, SSL verification, and botocore configuration,
completing the trigger portion of the Redshift cluster migration in
apache/airflow#35278.
---
.../amazon/aws/sensors/redshift_cluster.py | 3 +++
.../amazon/aws/triggers/redshift_cluster.py | 25 ++++++++++++++++++++--
.../amazon/aws/triggers/test_redshift_cluster.py | 25 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 2 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
index d4a93dff807..d8ff70b92a4 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -89,6 +89,9 @@ class RedshiftClusterSensor(AwsBaseSensor[RedshiftHook]):
cluster_identifier=self.cluster_identifier,
target_status=self.target_status,
poke_interval=self.poke_interval,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
index 5e6576d3931..ebd32a2b423 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
+from functools import cached_property
from typing import TYPE_CHECKING, Any
from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
@@ -283,6 +284,9 @@ class RedshiftClusterTrigger(BaseTrigger):
:param cluster_identifier: unique identifier of a cluster
:param target_status: Reference to the status which needs to be checked
:param poke_interval: polling period in seconds to check for the status
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
@@ -291,12 +295,18 @@ class RedshiftClusterTrigger(BaseTrigger):
cluster_identifier: str,
target_status: str,
poke_interval: float,
+ region_name: str | None = None,
+ verify: bool | str | None = None,
+ botocore_config: dict | None = None,
):
super().__init__()
self.aws_conn_id = aws_conn_id
self.cluster_identifier = cluster_identifier
self.target_status = target_status
self.poke_interval = poke_interval
+ self.region_name = region_name
+ self.verify = verify
+ self.botocore_config = botocore_config
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize RedshiftClusterTrigger arguments and classpath."""
@@ -307,15 +317,26 @@ class RedshiftClusterTrigger(BaseTrigger):
"cluster_identifier": self.cluster_identifier,
"target_status": self.target_status,
"poke_interval": self.poke_interval,
+ "region_name": self.region_name,
+ "verify": self.verify,
+ "botocore_config": self.botocore_config,
},
)
+ @cached_property
+ def hook(self) -> RedshiftHook:
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
+
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Run async until the cluster status matches the target status."""
try:
- hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
while True:
- status = await
hook.cluster_status_async(self.cluster_identifier)
+ status = await
self.hook.cluster_status_async(self.cluster_identifier)
if status == self.target_status:
yield TriggerEvent({"status": "success", "message":
"target state met"})
return
diff --git
a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
index d551308a724..a55494ce8ab 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
@@ -54,8 +54,33 @@ class TestRedshiftClusterTrigger:
"cluster_identifier": "mock_cluster_identifier",
"target_status": "available",
"poke_interval": POLLING_PERIOD_SECONDS,
+ "region_name": None,
+ "verify": None,
+ "botocore_config": None,
}
+ def test_redshift_cluster_trigger_serializes_generic_hook_params(self):
+ """Asserts the generic AWS hook params are serialized and used to
build the hook."""
+ trigger = RedshiftClusterTrigger(
+ aws_conn_id="test_redshift_conn_id",
+ cluster_identifier="mock_cluster_identifier",
+ target_status="available",
+ poke_interval=POLLING_PERIOD_SECONDS,
+ region_name="eu-west-1",
+ verify=False,
+ botocore_config={"read_timeout": 42},
+ )
+ _, kwargs = trigger.serialize()
+ assert kwargs["region_name"] == "eu-west-1"
+ assert kwargs["verify"] is False
+ assert kwargs["botocore_config"] == {"read_timeout": 42}
+
+ hook = trigger.hook
+ assert hook.aws_conn_id == "test_redshift_conn_id"
+ assert hook._region_name == "eu-west-1"
+ assert hook._verify is False
+ assert hook._config.read_timeout == 42
+
@pytest.mark.asyncio
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status_async")
async def test_redshift_cluster_sensor_trigger_success(self,
mock_cluster_status):