This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0a2f1810aa3 Propogate `verify` and `botocore_config` in batch triggers
(#67508)
0a2f1810aa3 is described below
commit 0a2f1810aa32cfb0e47ce452779c76f437a2214a
Author: Karsh Vashi <[email protected]>
AuthorDate: Tue May 26 00:19:03 2026 +0100
Propogate `verify` and `botocore_config` in batch triggers (#67508)
---
.../providers/amazon/aws/operators/batch.py | 10 +-
.../airflow/providers/amazon/aws/triggers/batch.py | 20 +++-
.../tests/unit/amazon/aws/triggers/test_batch.py | 101 +++++++++++++++++++--
3 files changed, 119 insertions(+), 12 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
index 84479005cb6..26c0f2cabea 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
@@ -238,6 +238,8 @@ class BatchOperator(AwsBaseOperator[BatchClientHook]):
aws_conn_id=self.aws_conn_id,
region_name=self.region_name,
waiter_delay=self.poll_interval,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
@@ -514,7 +516,13 @@ class
BatchCreateComputeEnvironmentOperator(AwsBaseOperator[BatchClientHook]):
if self.deferrable:
self.defer(
trigger=BatchCreateComputeEnvironmentTrigger(
- arn, self.poll_interval, self.max_retries,
self.aws_conn_id, self.region_name
+ compute_env_arn=arn,
+ waiter_delay=self.poll_interval,
+ waiter_max_attempts=self.max_retries,
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
),
method_name="execute_complete",
)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/batch.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/batch.py
index 9d97ead007b..14b369f8bd0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/batch.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/batch.py
@@ -40,11 +40,13 @@ class BatchJobTrigger(AwsBaseWaiterTrigger):
def __init__(
self,
+ *,
job_id: str | None,
region_name: str | None = None,
aws_conn_id: str | None = "aws_default",
waiter_delay: int = 5,
waiter_max_attempts: int = 720,
+ **kwargs,
):
super().__init__(
serialized_fields={"job_id": job_id},
@@ -59,10 +61,16 @@ class BatchJobTrigger(AwsBaseWaiterTrigger):
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return BatchClientHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+ return BatchClientHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
class BatchCreateComputeEnvironmentTrigger(AwsBaseWaiterTrigger):
@@ -78,11 +86,13 @@ class
BatchCreateComputeEnvironmentTrigger(AwsBaseWaiterTrigger):
def __init__(
self,
+ *,
compute_env_arn: str,
waiter_delay: int = 30,
waiter_max_attempts: int = 10,
aws_conn_id: str | None = "aws_default",
region_name: str | None = None,
+ **kwargs,
):
super().__init__(
serialized_fields={"compute_env_arn": compute_env_arn},
@@ -96,7 +106,13 @@ class
BatchCreateComputeEnvironmentTrigger(AwsBaseWaiterTrigger):
waiter_max_attempts=waiter_max_attempts,
aws_conn_id=aws_conn_id,
region_name=region_name,
+ **kwargs,
)
def hook(self) -> AwsGenericHook:
- return BatchClientHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+ return BatchClientHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_batch.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_batch.py
index ef6e22e965d..1b8ece6ace4 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_batch.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_batch.py
@@ -14,22 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
from __future__ import annotations
-from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger
+from unittest.mock import patch
+
+from airflow.providers.amazon.aws.triggers.batch import (
+ BatchCreateComputeEnvironmentTrigger,
+ BatchJobTrigger,
+)
class TestBatchJobTrigger:
def test_serialization(self):
- job_id = "test_job_id"
- aws_conn_id = "aws_default"
- region_name = "us-west-2"
-
trigger = BatchJobTrigger(
- job_id=job_id,
- aws_conn_id=aws_conn_id,
- region_name=region_name,
+ job_id="test_job_id",
+ aws_conn_id="aws_default",
+ region_name="us-west-2",
)
classpath, kwargs = trigger.serialize()
@@ -41,3 +41,86 @@ class TestBatchJobTrigger:
"aws_conn_id": "aws_default",
"region_name": "us-west-2",
}
+
+ def test_serialization_with_verify_and_botocore_config(self):
+ trigger = BatchJobTrigger(
+ job_id="test_job_id",
+ aws_conn_id="aws_default",
+ region_name="us-west-2",
+ verify=False,
+ botocore_config={"connect_timeout": 30},
+ )
+
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.batch.BatchJobTrigger"
+ assert kwargs["verify"] is False
+ assert kwargs["botocore_config"] == {"connect_timeout": 30}
+
+ @patch("airflow.providers.amazon.aws.triggers.batch.BatchClientHook")
+ def test_hook_propagates_verify_and_botocore_config(self, mock_hook_cls):
+ trigger = BatchJobTrigger(
+ job_id="test_job_id",
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ botocore_config={"read_timeout": 60},
+ )
+
+ trigger.hook()
+
+ mock_hook_cls.assert_called_once_with(
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ config={"read_timeout": 60},
+ )
+
+
+class TestBatchCreateComputeEnvironmentTrigger:
+ def test_serialization(self):
+ trigger = BatchCreateComputeEnvironmentTrigger(
+
compute_env_arn="arn:aws:batch:us-east-1:123456789012:compute-environment/test",
+ aws_conn_id="aws_default",
+ region_name="us-east-1",
+ )
+
+ classpath, kwargs = trigger.serialize()
+ assert classpath ==
"airflow.providers.amazon.aws.triggers.batch.BatchCreateComputeEnvironmentTrigger"
+ assert kwargs == {
+ "compute_env_arn":
"arn:aws:batch:us-east-1:123456789012:compute-environment/test",
+ "waiter_delay": 30,
+ "waiter_max_attempts": 10,
+ "aws_conn_id": "aws_default",
+ "region_name": "us-east-1",
+ }
+
+ def test_serialization_with_verify_and_botocore_config(self):
+ trigger = BatchCreateComputeEnvironmentTrigger(
+
compute_env_arn="arn:aws:batch:us-east-1:123456789012:compute-environment/test",
+ aws_conn_id="aws_default",
+ verify=False,
+ botocore_config={"connect_timeout": 30},
+ )
+
+ classpath, kwargs = trigger.serialize()
+ assert kwargs["verify"] is False
+ assert kwargs["botocore_config"] == {"connect_timeout": 30}
+
+ @patch("airflow.providers.amazon.aws.triggers.batch.BatchClientHook")
+ def test_hook_propagates_verify_and_botocore_config(self, mock_hook_cls):
+ trigger = BatchCreateComputeEnvironmentTrigger(
+
compute_env_arn="arn:aws:batch:us-east-1:123456789012:compute-environment/test",
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ botocore_config={"read_timeout": 60},
+ )
+
+ trigger.hook()
+
+ mock_hook_cls.assert_called_once_with(
+ aws_conn_id="test_conn",
+ region_name="eu-west-1",
+ verify="/path/to/ca-bundle.crt",
+ config={"read_timeout": 60},
+ )