This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fa731037501 Do not run export operations in `example_dynamodb_to_s3` 
(#54158)
fa731037501 is described below

commit fa731037501699e40a9ea72cfbffc0026638267a
Author: Vincent <[email protected]>
AuthorDate: Wed Aug 6 11:30:03 2025 -0400

    Do not run export operations in `example_dynamodb_to_s3` (#54158)
---
 .../system/amazon/aws/example_dynamodb_to_s3.py    | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py 
b/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
index 3a6f8fc1a93..7c56d9ddf9c 100644
--- a/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
+++ b/providers/amazon/tests/system/amazon/aws/example_dynamodb_to_s3.py
@@ -17,12 +17,13 @@
 from __future__ import annotations
 
 import logging
-from datetime import datetime, timedelta
+from datetime import datetime
 
 import boto3
 import tenacity
 from tenacity import before_log, before_sleep_log
 
+from airflow.exceptions import AirflowSkipException
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
 from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import 
DynamoDBToS3Operator
 
@@ -117,7 +118,7 @@ def delete_dynamodb_table(table_name: str):
 @task_group
 def incremental_export(table_name: str, start_time: datetime):
     """
-    Incremental export requires a minimum window of 15 minutes of data to 
export.
+    Export functions can take a lot of time.
     This task group allows us to have the sample code snippet for the docs 
while
     skipping the task when we run the actual test.
     """
@@ -152,13 +153,27 @@ def incremental_export(table_name: str, start_time: 
datetime):
     # This operation can take a long time to complete
     backup_db_to_point_in_time_incremental_export.max_attempts = 90
 
-    @task.short_circuit()
-    def should_run_incremental_export(start_time: datetime, end_time: 
datetime):
-        return end_time >= (start_time + timedelta(minutes=15))
+    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+    backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
+        task_id="backup_db_to_point_in_time_full_export",
+        dynamodb_table_name=table_name,
+        s3_bucket_name=bucket_name,
+        point_in_time_export=True,
+        export_time=export_time,
+        s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
+    )
+    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
+    backup_db_to_point_in_time_full_export.max_attempts = 90
+
+    @task
+    def stop_execution():
+        raise AirflowSkipException("Skipping this task.")
 
-    should_run_incremental = 
should_run_incremental_export(start_time=start_time, end_time=end_time)
+    stop = stop_execution()
 
-    chain(end_time, should_run_incremental, 
backup_db_to_point_in_time_incremental_export)
+    chain(
+        end_time, stop, backup_db_to_point_in_time_incremental_export, 
backup_db_to_point_in_time_full_export
+    )
 
 
 with DAG(
@@ -217,17 +232,6 @@ with DAG(
     # [END howto_transfer_dynamodb_to_s3_segmented]
 
     export_time = get_export_time(table_name)
-    # [START howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
-    backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
-        task_id="backup_db_to_point_in_time_full_export",
-        dynamodb_table_name=table_name,
-        s3_bucket_name=bucket_name,
-        point_in_time_export=True,
-        export_time=export_time,
-        s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
-    )
-    # [END howto_transfer_dynamodb_to_s3_in_some_point_in_time_full_export]
-    backup_db_to_point_in_time_full_export.max_attempts = 90
 
     delete_table = delete_dynamodb_table(table_name=table_name)
 
@@ -249,7 +253,6 @@ with DAG(
         backup_db_segment_1,
         backup_db_segment_2,
         export_time,
-        backup_db_to_point_in_time_full_export,
         incremental_export(table_name=table_name, start_time=export_time),
         # TEST TEARDOWN
         delete_table,

Reply via email to