This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76f9210e5e0 Propagate AWS hook parameters through 
RedshiftClusterTrigger (#68925)
76f9210e5e0 is described below

commit 76f9210e5e00d3e17ca2557e475596f70b382781
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jun 24 10:41:24 2026 -0700

    Propagate AWS hook parameters through RedshiftClusterTrigger (#68925)
    
    RedshiftClusterTrigger now accepts and serializes region_name, verify, and
    botocore_config and builds its hook with them, and RedshiftClusterSensor
    forwards those values when deferring. Previously a deferred 
RedshiftClusterSensor
    silently dropped the region, SSL verification, and botocore configuration,
    completing the trigger portion of the Redshift cluster migration in
    apache/airflow#35278.
---
 .../amazon/aws/sensors/redshift_cluster.py         |  3 +++
 .../amazon/aws/triggers/redshift_cluster.py        | 25 ++++++++++++++++++++--
 .../amazon/aws/triggers/test_redshift_cluster.py   | 25 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 2 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
index d4a93dff807..d8ff70b92a4 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -89,6 +89,9 @@ class RedshiftClusterSensor(AwsBaseSensor[RedshiftHook]):
                     cluster_identifier=self.cluster_identifier,
                     target_status=self.target_status,
                     poke_interval=self.poke_interval,
+                    region_name=self.region_name,
+                    verify=self.verify,
+                    botocore_config=self.botocore_config,
                 ),
                 method_name="execute_complete",
             )
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
index 5e6576d3931..ebd32a2b423 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import asyncio
 from collections.abc import AsyncIterator
+from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
 from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
@@ -283,6 +284,9 @@ class RedshiftClusterTrigger(BaseTrigger):
     :param cluster_identifier: unique identifier of a cluster
     :param target_status: Reference to the status which needs to be checked
     :param poke_interval:  polling period in seconds to check for the status
+    :param region_name: The AWS region where the cluster is. Used to build the 
hook.
+    :param verify: Whether or not to verify SSL certificates. Used to build 
the hook.
+    :param botocore_config: Configuration dictionary for the botocore client. 
Used to build the hook.
     """
 
     def __init__(
@@ -291,12 +295,18 @@ class RedshiftClusterTrigger(BaseTrigger):
         cluster_identifier: str,
         target_status: str,
         poke_interval: float,
+        region_name: str | None = None,
+        verify: bool | str | None = None,
+        botocore_config: dict | None = None,
     ):
         super().__init__()
         self.aws_conn_id = aws_conn_id
         self.cluster_identifier = cluster_identifier
         self.target_status = target_status
         self.poke_interval = poke_interval
+        self.region_name = region_name
+        self.verify = verify
+        self.botocore_config = botocore_config
 
     def serialize(self) -> tuple[str, dict[str, Any]]:
         """Serialize RedshiftClusterTrigger arguments and classpath."""
@@ -307,15 +317,26 @@ class RedshiftClusterTrigger(BaseTrigger):
                 "cluster_identifier": self.cluster_identifier,
                 "target_status": self.target_status,
                 "poke_interval": self.poke_interval,
+                "region_name": self.region_name,
+                "verify": self.verify,
+                "botocore_config": self.botocore_config,
             },
         )
 
+    @cached_property
+    def hook(self) -> RedshiftHook:
+        return RedshiftHook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
+
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Run async until the cluster status matches the target status."""
         try:
-            hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
             while True:
-                status = await 
hook.cluster_status_async(self.cluster_identifier)
+                status = await 
self.hook.cluster_status_async(self.cluster_identifier)
                 if status == self.target_status:
                     yield TriggerEvent({"status": "success", "message": 
"target state met"})
                     return
diff --git 
a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py 
b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
index d551308a724..a55494ce8ab 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
@@ -54,8 +54,33 @@ class TestRedshiftClusterTrigger:
             "cluster_identifier": "mock_cluster_identifier",
             "target_status": "available",
             "poke_interval": POLLING_PERIOD_SECONDS,
+            "region_name": None,
+            "verify": None,
+            "botocore_config": None,
         }
 
+    def test_redshift_cluster_trigger_serializes_generic_hook_params(self):
+        """Asserts the generic AWS hook params are serialized and used to 
build the hook."""
+        trigger = RedshiftClusterTrigger(
+            aws_conn_id="test_redshift_conn_id",
+            cluster_identifier="mock_cluster_identifier",
+            target_status="available",
+            poke_interval=POLLING_PERIOD_SECONDS,
+            region_name="eu-west-1",
+            verify=False,
+            botocore_config={"read_timeout": 42},
+        )
+        _, kwargs = trigger.serialize()
+        assert kwargs["region_name"] == "eu-west-1"
+        assert kwargs["verify"] is False
+        assert kwargs["botocore_config"] == {"read_timeout": 42}
+
+        hook = trigger.hook
+        assert hook.aws_conn_id == "test_redshift_conn_id"
+        assert hook._region_name == "eu-west-1"
+        assert hook._verify is False
+        assert hook._config.read_timeout == 42
+
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status_async")
     async def test_redshift_cluster_sensor_trigger_success(self, 
mock_cluster_status):

Reply via email to