This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b1196460db Add `check_interval` and `max_attempts` as parameter of 
`DynamoDBToS3Operator` (#34972)
b1196460db is described below

commit b1196460db1a21b2c6c3ef2e841fc6d0c22afe97
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Mon Oct 16 15:09:20 2023 -0400

    Add `check_interval` and `max_attempts` as parameter of 
`DynamoDBToS3Operator` (#34972)
---
 airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py   | 14 +++++++++++++-
 .../system/providers/amazon/aws/example_dynamodb_to_s3.py  |  2 ++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index b83ff48906..5351f3ff7c 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -92,6 +92,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
      the Unix epoch. The table export will be a snapshot of the table's state 
at this point in time.
     :param export_format: The format for the exported data. Valid values for 
ExportFormat are DYNAMODB_JSON
      or ION.
+    :param check_interval: The amount of time in seconds to wait between 
attempts. Only if ``export_time`` is
+        provided.
+    :param max_attempts: The maximum number of attempts to be made. Only if 
``export_time`` is provided.
     """
 
     template_fields: Sequence[str] = (
@@ -104,6 +107,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         "process_func",
         "export_time",
         "export_format",
+        "check_interval",
+        "max_attempts",
     )
 
     template_fields_renderers = {
@@ -121,6 +126,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         process_func: Callable[[dict[str, Any]], bytes] = 
_convert_item_to_json_bytes,
         export_time: datetime | None = None,
         export_format: str = "DYNAMODB_JSON",
+        check_interval: int = 30,
+        max_attempts: int = 60,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -132,6 +139,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         self.s3_key_prefix = s3_key_prefix
         self.export_time = export_time
         self.export_format = export_format
+        self.check_interval = check_interval
+        self.max_attempts = max_attempts
 
     @cached_property
     def hook(self):
@@ -164,7 +173,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator):
         )
         waiter = self.hook.get_waiter("export_table")
         export_arn = response.get("ExportDescription", {}).get("ExportArn")
-        waiter.wait(ExportArn=export_arn)
+        waiter.wait(
+            ExportArn=export_arn,
+            WaiterConfig={"Delay": self.check_interval, "MaxAttempts": 
self.max_attempts},
+        )
 
     def _export_entire_data(self):
         """Export all data from the table."""
diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py 
b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
index 70ea9ad77e..dc08e2d5b9 100644
--- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py
@@ -172,6 +172,8 @@ with DAG(
         s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
     )
     # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time]
+    # This operation can take a long time to complete
+    backup_db_to_point_in_time.max_attempts = 90
 
     delete_table = delete_dynamodb_table(table_name=table_name)
 

Reply via email to