This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fa731037501 Do not run export operations in `example_dynamodb_to_s3`
(#54158)
fa731037501 is described below
commit fa731037501699e40a9ea72cfbffc0026638267a
Author: Vincent <[email protected]>
AuthorDate: Wed Aug 6 11:30:03 2025 -0400
Do not run export operations in `example_dynamodb_to_s3` (#54158)
---
.../system/amazon/aws/example_dynamodb_to_s3.py | 41 ++++++++++++----------
1 file changed, 22 insertions(+), 19 deletions(-)
diff --git a/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
b/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
index 3a6f8fc1a93..7c56d9ddf9c 100644
--- a/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
+++ b/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
@@ -17,12 +17,13 @@
from __future__ import annotations
import logging
-from datetime import datetime, timedelta
+from datetime import datetime
import boto3
import tenacity
from tenacity import before_log, before_sleep_log
+from airflow.exceptions import AirflowSkipException
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import
DynamoDBToS3Operator
@@ -117,7 +118,7 @@ def delete_dynamodb_table(table_name: str):
@task_group
def incremental_export(table_name: str, start_time: datetime):
"""
- Incremental export requires a minimum window of 15 minutes of data to
export.
+ Export functions can take a lot of time.
This task group allows us to have the sample code snippet for the docs
while
skipping the task when we run the actual test.
"""
@@ -152,13 +153,27 @@ def incremental_export(table_name: str, start_time:
datetime):
# This operation can take a long time to complete
backup_db_to_point_in_time_incremental_export.max_attempts = 90
- @task.short_circuit()
- def should_run_incremental_export(start_time: datetime, end_time:
datetime):
- return end_time >= (start_time + timedelta(minutes=15))
+ # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+ backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
+ task_id="backup_db_to_point_in_time_full_export",
+ dynamodb_table_name=table_name,
+ s3_bucket_name=bucket_name,
+ point_in_time_export=True,
+ export_time=export_time,
+ s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
+ )
+ # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+ backup_db_to_point_in_time_full_export.max_attempts = 90
+
+ @task
+ def stop_execution():
+ raise AirflowSkipException("Skipping this task.")
- should_run_incremental =
should_run_incremental_export(start_time=start_time, end_time=end_time)
+ stop = stop_execution()
- chain(end_time, should_run_incremental,
backup_db_to_point_in_time_incremental_export)
+ chain(
+ end_time, stop, backup_db_to_point_in_time_incremental_export,
backup_db_to_point_in_time_full_export
+ )
with DAG(
@@ -217,17 +232,6 @@ with DAG(
# [END howto_transfer_dynamodb_to_s3_segmented]
export_time = get_export_time(table_name)
- # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
- backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
- task_id="backup_db_to_point_in_time_full_export",
- dynamodb_table_name=table_name,
- s3_bucket_name=bucket_name,
- point_in_time_export=True,
- export_time=export_time,
- s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
- )
- # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
- backup_db_to_point_in_time_full_export.max_attempts = 90
delete_table = delete_dynamodb_table(table_name=table_name)
@@ -249,7 +253,6 @@ with DAG(
backup_db_segment_1,
backup_db_segment_2,
export_time,
- backup_db_to_point_in_time_full_export,
incremental_export(table_name=table_name, start_time=export_time),
# TEST TEARDOWN
delete_table,