ldacey commented on issue #53593:
URL: https://github.com/apache/airflow/issues/53593#issuecomment-3097951801

   > Are you using mapped tasks on either end?
   
   No - no mapped tasks at all. 
   
   My DAG files are registered using config files so they don't show much. 
   
   Here is an older one where the lower_bound is the pulling the timestamp 
through XCom from the previous run directly in a DAG template file (less 
abstraction compared to the examples above):
   
   ```python
       @dag(**_get_base_dag_args(config))
       def extract_dag() -> None:
           """Pipeline to extract data from database incrementally."""
           from common.taskflow.extract.database import extract_records
           from common.taskflow.triggers.transform import 
trigger_transformation_dag
   
           @task(
               task_id="notify_extraction_start",
               task_display_name="Notify Webex",
           )
           def notify_extraction_start() -> None:
               """Notifies the Webex Teams space that the pipeline has 
started."""
   
           start = notify_extraction_start()
   
           extracted_data = extract_records.override(
               task_id="extract_records",
               task_display_name="Query Database Records",
               executor_config=config.get_resources(),
           )(
               config=config,
               lower_bound="{{ params.lower_bound or 
ti.xcom_pull(task_ids='extract_records', key='max_value', 
include_prior_dates=True) or dag.start_date }}",
           )
   
           start >> extracted_data
   
           trigger_transformation_dag.override(
               task_id="trigger_transformation_dag",
               task_display_name="Trigger Transform",
               outlets=[config.get_destination_dataset()().airflow_asset],
           )(
               config=config,
               files=extracted_data,
               message_id="{{ ti.xcom_pull(task_ids='notify_extraction_start') 
}}",
           )
   
       return extract_dag()
   ```
   
   And the XCom view in the UI has the correct XCom and the max_value key which 
should have been pulled.
   
   <img width="902" height="93" alt="Image" 
src="https://github.com/user-attachments/assets/9b18293a-a3c7-484a-82aa-6016d18d5cf2";
 />
   
   **Airflow 3.0.2 (lower bound is correct from previous run `max_value`)**
   
   > {"timestamp":"2025-07-16T16:07:24.464260","level":"info","event":"Final 
query parameters: {'lower_bound': '2025-07-15 18:10:18', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   >
   > {"timestamp":"2025-07-17T16:07:32.238572","level":"info","event":"Final 
query parameters: {'lower_bound': '2025-07-16 18:08:48', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   
   
   **Airflow 3.0.3 (lower bound is the DAG start). I did not immediately notice 
the issue because the DAG still ran successfully:**
   
   > {"timestamp":"2025-07-18T16:07:22.463658","level":"info","event":"Final 
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   > 
   > {"timestamp":"2025-07-19T16:07:48.413275","level":"info","event":"Final 
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   > 
   > {"timestamp":"2025-07-20T16:17:58.726883","level":"info","event":"Final 
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   
   
   **Airflow 3.0.2 (downgraded DB and reinstalled 3.0.2):**
   
   > {"timestamp":"2025-07-21T03:47:44.358213","level":"info","event":"Final 
query parameters: {'lower_bound': '2025-07-19 18:06:54', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   >
   >{"timestamp":"2025-07-21T16:07:25.134960","level":"info","event":"Final 
query parameters: {'lower_bound': '2025-07-20 18:17:46', 'max_records': 
8500000, 'incremental_column': 
'processeddatetime'}","logger":"common.models.extract.database"}
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to