This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b1196460db Add `check_interval` and `max_attempts` as parameter of `DynamoDBToS3Operator` (#34972) b1196460db is described below commit b1196460db1a21b2c6c3ef2e841fc6d0c22afe97 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Mon Oct 16 15:09:20 2023 -0400 Add `check_interval` and `max_attempts` as parameter of `DynamoDBToS3Operator` (#34972) --- airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 14 +++++++++++++- .../system/providers/amazon/aws/example_dynamodb_to_s3.py | 2 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index b83ff48906..5351f3ff7c 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -92,6 +92,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): the Unix epoch. The table export will be a snapshot of the table's state at this point in time. :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON or ION. + :param check_interval: The amount of time in seconds to wait between attempts. Only if ``export_time`` is + provided. + :param max_attempts: The maximum number of attempts to be made. Only if ``export_time`` is provided. """ template_fields: Sequence[str] = ( @@ -104,6 +107,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): "process_func", "export_time", "export_format", + "check_interval", + "max_attempts", ) template_fields_renderers = { @@ -121,6 +126,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes, export_time: datetime | None = None, export_format: str = "DYNAMODB_JSON", + check_interval: int = 30, + max_attempts: int = 60, **kwargs, ) -> None: super().__init__(**kwargs) @@ -132,6 +139,8 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): self.s3_key_prefix = s3_key_prefix self.export_time = export_time self.export_format = export_format + self.check_interval = check_interval + self.max_attempts = max_attempts @cached_property def hook(self): @@ -164,7 +173,10 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): ) waiter = self.hook.get_waiter("export_table") export_arn = response.get("ExportDescription", {}).get("ExportArn") - waiter.wait(ExportArn=export_arn) + waiter.wait( + ExportArn=export_arn, + WaiterConfig={"Delay": self.check_interval, "MaxAttempts": self.max_attempts}, + ) def _export_entire_data(self): """Export all data from the table.""" diff --git a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py index 70ea9ad77e..dc08e2d5b9 100644 --- a/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py +++ b/tests/system/providers/amazon/aws/example_dynamodb_to_s3.py @@ -172,6 +172,8 @@ with DAG( s3_key_prefix=f"{S3_KEY_PREFIX}-3-", ) # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time] + # This operation can take a long time to complete + backup_db_to_point_in_time.max_attempts = 90 delete_table = delete_dynamodb_table(table_name=table_name)