This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 28e93c14410 Propogate verify and botocore_config in redshift cluster
triggers (#67876)
28e93c14410 is described below
commit 28e93c1441085de11c4c9ce1d03a750701992708
Author: Karsh Vashi <[email protected]>
AuthorDate: Tue Jun 2 21:30:15 2026 +0100
Propogate verify and botocore_config in redshift cluster triggers (#67876)
---
.../amazon/aws/operators/redshift_cluster.py | 15 +++
.../amazon/aws/triggers/redshift_cluster.py | 75 +++++++++++++-
.../amazon/aws/triggers/test_redshift_cluster.py | 108 +++++++++++++++++++++
3 files changed, 193 insertions(+), 5 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
index c050987df1d..ea0d1272db8 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -378,6 +378,9 @@ class
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempt,
aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
@@ -497,6 +500,9 @@ class
RedshiftCreateClusterSnapshotOperator(AwsBaseOperator[RedshiftHook]):
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempt,
aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the timeout
does not restart
@@ -668,6 +674,9 @@ class
RedshiftResumeClusterOperator(AwsBaseOperator[RedshiftHook]):
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempts,
aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the
timeout does not restart
@@ -775,6 +784,9 @@ class
RedshiftPauseClusterOperator(AwsBaseOperator[RedshiftHook]):
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempts,
aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
# timeout is set to ensure that if a trigger dies, the
timeout does not restart
@@ -901,6 +913,9 @@ class
RedshiftDeleteClusterOperator(AwsBaseOperator[RedshiftHook]):
waiter_delay=self.poll_interval,
waiter_max_attempts=self.max_attempts,
aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
index ad299595bf8..5e6576d3931 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py
@@ -39,14 +39,20 @@ class RedshiftCreateClusterTrigger(AwsBaseWaiterTrigger):
:param waiter_delay: The amount of time in seconds to wait between
attempts.
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
self,
+ *,
cluster_identifier: str,
aws_conn_id: str | None = "aws_default",
+ region_name: str | None = None,
waiter_delay: int = 15,
waiter_max_attempts: int = 999999,
+ **kwargs,
):
super().__init__(
serialized_fields={"cluster_identifier": cluster_identifier},
@@ -59,10 +65,17 @@ class RedshiftCreateClusterTrigger(AwsBaseWaiterTrigger):
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
+ region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return RedshiftHook(aws_conn_id=self.aws_conn_id)
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger):
@@ -76,14 +89,20 @@ class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger):
:param waiter_delay: The amount of time in seconds to wait between
attempts.
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
self,
+ *,
cluster_identifier: str,
aws_conn_id: str | None = "aws_default",
+ region_name: str | None = None,
waiter_delay: int = 15,
waiter_max_attempts: int = 999999,
+ **kwargs,
):
super().__init__(
serialized_fields={"cluster_identifier": cluster_identifier},
@@ -96,10 +115,17 @@ class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger):
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
+ region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return RedshiftHook(aws_conn_id=self.aws_conn_id)
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger):
@@ -113,14 +139,20 @@ class
RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger):
:param waiter_delay: The amount of time in seconds to wait between
attempts.
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
self,
+ *,
cluster_identifier: str,
aws_conn_id: str | None = "aws_default",
+ region_name: str | None = None,
waiter_delay: int = 15,
waiter_max_attempts: int = 999999,
+ **kwargs,
):
super().__init__(
serialized_fields={"cluster_identifier": cluster_identifier},
@@ -133,10 +165,17 @@ class
RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger):
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
+ region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return RedshiftHook(aws_conn_id=self.aws_conn_id)
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger):
@@ -150,14 +189,20 @@ class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger):
:param waiter_delay: The amount of time in seconds to wait between
attempts.
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
self,
+ *,
cluster_identifier: str,
aws_conn_id: str | None = "aws_default",
+ region_name: str | None = None,
waiter_delay: int = 15,
waiter_max_attempts: int = 999999,
+ **kwargs,
):
super().__init__(
serialized_fields={"cluster_identifier": cluster_identifier},
@@ -170,10 +215,17 @@ class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger):
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
+ region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return RedshiftHook(aws_conn_id=self.aws_conn_id)
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger):
@@ -184,14 +236,20 @@ class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger):
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials.
:param waiter_delay: The amount of time in seconds to wait between
attempts.
+ :param region_name: The AWS region where the cluster is. Used to build the
hook.
+ :param verify: Whether or not to verify SSL certificates. Used to build
the hook.
+ :param botocore_config: Configuration dictionary for the botocore client.
Used to build the hook.
"""
def __init__(
self,
+ *,
cluster_identifier: str,
aws_conn_id: str | None = "aws_default",
+ region_name: str | None = None,
waiter_delay: int = 30,
waiter_max_attempts: int = 30,
+ **kwargs,
):
super().__init__(
serialized_fields={"cluster_identifier": cluster_identifier},
@@ -204,10 +262,17 @@ class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger):
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
+ region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return RedshiftHook(aws_conn_id=self.aws_conn_id)
+ return RedshiftHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class RedshiftClusterTrigger(BaseTrigger):
diff --git
a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
index 75055fe5ad6..d551308a724 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py
@@ -24,6 +24,11 @@ import pytest
from airflow.providers.amazon.aws.triggers.redshift_cluster import (
RedshiftClusterTrigger,
+ RedshiftCreateClusterSnapshotTrigger,
+ RedshiftCreateClusterTrigger,
+ RedshiftDeleteClusterTrigger,
+ RedshiftPauseClusterTrigger,
+ RedshiftResumeClusterTrigger,
)
from airflow.triggers.base import TriggerEvent
@@ -115,3 +120,106 @@ class TestRedshiftClusterTrigger:
# so we validate for length of task to be 1
assert len(task) == 1
assert TriggerEvent({"status": "error", "message": "Test exception"})
in task
+
+
+WAITER_TRIGGER_PARAMS = [
+ pytest.param(
+ RedshiftCreateClusterTrigger,
+ 15,
+ 999999,
+ id="RedshiftCreateClusterTrigger",
+ ),
+ pytest.param(
+ RedshiftPauseClusterTrigger,
+ 15,
+ 999999,
+ id="RedshiftPauseClusterTrigger",
+ ),
+ pytest.param(
+ RedshiftCreateClusterSnapshotTrigger,
+ 15,
+ 999999,
+ id="RedshiftCreateClusterSnapshotTrigger",
+ ),
+ pytest.param(
+ RedshiftResumeClusterTrigger,
+ 15,
+ 999999,
+ id="RedshiftResumeClusterTrigger",
+ ),
+ pytest.param(
+ RedshiftDeleteClusterTrigger,
+ 30,
+ 30,
+ id="RedshiftDeleteClusterTrigger",
+ ),
+]
+
+
+class TestRedshiftWaiterTriggers:
+ """Tests for the five Redshift triggers that inherit from
``AwsBaseWaiterTrigger``."""
+
+ @pytest.mark.parametrize(
+ ("trigger_cls", "default_delay", "default_max_attempts"),
+ WAITER_TRIGGER_PARAMS,
+ )
+ def test_serialization(self, trigger_cls, default_delay,
default_max_attempts):
+ trigger = trigger_cls(
+ cluster_identifier="test_cluster",
+ aws_conn_id="aws_default",
+ region_name="us-east-1",
+ )
+
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
f"airflow.providers.amazon.aws.triggers.redshift_cluster.{trigger_cls.__name__}"
+ assert kwargs == {
+ "cluster_identifier": "test_cluster",
+ "waiter_delay": default_delay,
+ "waiter_max_attempts": default_max_attempts,
+ "aws_conn_id": "aws_default",
+ "region_name": "us-east-1",
+ }
+
+ @pytest.mark.parametrize(
+ ("trigger_cls", "default_delay", "default_max_attempts"),
+ WAITER_TRIGGER_PARAMS,
+ )
+ def test_serialization_with_verify_and_botocore_config(
+ self, trigger_cls, default_delay, default_max_attempts
+ ):
+ trigger = trigger_cls(
+ cluster_identifier="test_cluster",
+ aws_conn_id="aws_default",
+ verify=False,
+ botocore_config={"connect_timeout": 30},
+ )
+
+ _, kwargs = trigger.serialize()
+ assert kwargs["verify"] is False
+ assert kwargs["botocore_config"] == {"connect_timeout": 30}
+ assert "region_name" not in kwargs
+
+ @pytest.mark.parametrize(
+ ("trigger_cls", "default_delay", "default_max_attempts"),
+ WAITER_TRIGGER_PARAMS,
+ )
+
@mock.patch("airflow.providers.amazon.aws.triggers.redshift_cluster.RedshiftHook")
+ def test_hook_propagates_verify_and_botocore_config(
+ self, mock_hook_cls, trigger_cls, default_delay, default_max_attempts
+ ):
+ trigger = trigger_cls(
+ cluster_identifier="test_cluster",
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ botocore_config={"read_timeout": 60},
+ )
+
+ trigger.hook()
+
+ mock_hook_cls.assert_called_once_with(
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ config={"read_timeout": 60},
+ )