ldacey commented on issue #53593:
URL: https://github.com/apache/airflow/issues/53593#issuecomment-3097951801
> Are you using mapped tasks on either end?
No - no mapped tasks at all.
My DAG files are registered using config files so they don't show much.
Here is an older one where the lower_bound is the pulling the timestamp
through XCom from the previous run directly in a DAG template file (less
abstraction compared to the examples above):
```python
@dag(**_get_base_dag_args(config))
def extract_dag() -> None:
"""Pipeline to extract data from database incrementally."""
from common.taskflow.extract.database import extract_records
from common.taskflow.triggers.transform import
trigger_transformation_dag
@task(
task_id="notify_extraction_start",
task_display_name="Notify Webex",
)
def notify_extraction_start() -> None:
"""Notifies the Webex Teams space that the pipeline has
started."""
start = notify_extraction_start()
extracted_data = extract_records.override(
task_id="extract_records",
task_display_name="Query Database Records",
executor_config=config.get_resources(),
)(
config=config,
lower_bound="{{ params.lower_bound or
ti.xcom_pull(task_ids='extract_records', key='max_value',
include_prior_dates=True) or dag.start_date }}",
)
start >> extracted_data
trigger_transformation_dag.override(
task_id="trigger_transformation_dag",
task_display_name="Trigger Transform",
outlets=[config.get_destination_dataset()().airflow_asset],
)(
config=config,
files=extracted_data,
message_id="{{ ti.xcom_pull(task_ids='notify_extraction_start')
}}",
)
return extract_dag()
```
And the XCom view in the UI has the correct XCom and the max_value key which
should have been pulled.
<img width="902" height="93" alt="Image"
src="https://github.com/user-attachments/assets/9b18293a-a3c7-484a-82aa-6016d18d5cf2"
/>
**Airflow 3.0.2 (lower bound is correct from previous run `max_value`)**
> {"timestamp":"2025-07-16T16:07:24.464260","level":"info","event":"Final
query parameters: {'lower_bound': '2025-07-15 18:10:18', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
>
> {"timestamp":"2025-07-17T16:07:32.238572","level":"info","event":"Final
query parameters: {'lower_bound': '2025-07-16 18:08:48', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
**Airflow 3.0.3 (lower bound is the DAG start). I did not immediately notice
the issue because the DAG still ran successfully:**
> {"timestamp":"2025-07-18T16:07:22.463658","level":"info","event":"Final
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
>
> {"timestamp":"2025-07-19T16:07:48.413275","level":"info","event":"Final
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
>
> {"timestamp":"2025-07-20T16:17:58.726883","level":"info","event":"Final
query parameters: {'lower_bound': '2022-12-31 18:30:00', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
**Airflow 3.0.2 (downgraded DB and reinstalled 3.0.2):**
> {"timestamp":"2025-07-21T03:47:44.358213","level":"info","event":"Final
query parameters: {'lower_bound': '2025-07-19 18:06:54', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
>
>{"timestamp":"2025-07-21T16:07:25.134960","level":"info","event":"Final
query parameters: {'lower_bound': '2025-07-20 18:17:46', 'max_records':
8500000, 'incremental_column':
'processeddatetime'}","logger":"common.models.extract.database"}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]