dabla opened a new issue, #64658:
URL: https://github.com/apache/airflow/issues/64658

   ### Apache Airflow version
   
   3.1.8
   
   ### What happened and how to reproduce it?
   
   We are experiencing an issue with long-running DAGs (>3 hours) where a task 
using `GenericTransfer
   
   [worker.log](https://github.com/user-attachments/files/26454822/worker.log)
   
   ` gets unexpectedly terminated. The task is actively processing data, but 
the scheduler incorrectly determines that the task is no longer in a running 
state and instructs it to terminate.
   
   This appears to be a state desynchronization issue between the scheduler and 
the worker.
   
   ```
       with DAG(
           default_args={
               "owner": "dabla",
               "email_on_failure": True,
               "email_on_retry": False,
               "retry_delay": timedelta(minutes=10),
           },
           dag_id=f"load_table_stg_full_load",
           schedule=None,
           start_date=datetime(2026, 4, 2),
           catchup=False,
           max_active_runs=1,
       ):
           load_data_to_postgres = GenericTransfer(
               task_id="load_data_to_postgres",
               source_conn_id="sas.prod",
               destination_conn_id="postgres.dev",
               destination_table='sa."TABLE_STG"',
               sql="SELECT * FROM BI.TABLE",
               preoperator=[
                   "sql/ddl/create_table_stg.sql",
                   "sql/ddl/create_table.sql",
                   'TRUNCATE TABLE sa."TABLE_STG";',
               ],
               insert_args={
                   "commit_every": 10000,
                   "autocommit": False,
                   "executemany": True,
                   "fast_executemany": True,
               },
               page_size=10000,
               paginated_sql_statement_clause="{0}(firstobs=%eval({2} + 1) 
obs=%eval({2} + {1})) ORDER BY ID;",
               retries=2,
           )
   
           get_source_row_count = SQLExecuteQueryOperator(
               task_id="get_source_row_count",
               conn_id=source_conn_id,
               sql=f"""
                   SELECT COUNT(*) AS row_count
                   FROM BI.TABLE
               """,
               output_processor=lambda result, _: result[0][0],
           )
   
           validate_target_row_count = SQLValueCheckOperator(
               task_id="validate_target_row_count_matches_source",
               conn_id=dest_conn_id,
               sql=f"""
                   SELECT COUNT(*)
                   FROM sa."TABLE_STG"
               """,
               pass_value=get_source_row_count.output,
           )
   
           load_data_to_postgres >> get_source_row_count >> 
validate_target_row_count
   ```
   
   ### What you think should happen instead?
   
   The task shouldn't be killed.
   
   ### Operating System
   
   Fedora 5.3
   
   ### Versions of Apache Airflow Providers
   
   Latests versions for 3.1
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   Probably related to this 
[#48719](https://github.com/apache/airflow/issues/48719)
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to