xmariachi opened a new issue, #35474:
URL: https://github.com/apache/airflow/issues/35474

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Version: 2.5.1
   Run env: MWAA on AWS
   
   Summary: Once every ~500-1000 runs approximately, the task hangs up 
infinitely until manually killed, not allowing any other task to be placed for 
this dag; and so its `execution_timeout` is not enforced. 
   In my experience, it only happens on tasks that consume from Kafka using 
library `confluent_kafka`. The `execution_timeout` is enforced in other tasks.
   
   Dag definition code: 
   
   ```
   # Dag Info
   default_args = {
       "retries": 3,
       "on_failure_callback": on_failure_callback,
       "sla": timedelta(hours=2),
       "execution_timeout": timedelta(hours=4),
   }
   
   
   @dag(SERVICE_NAME,
        default_args=default_args,
        schedule_interval="*/5 * * * *",
        start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
        catchup=True,
        tags=['critical', 'dumper', 'kafka'],
        max_active_runs=1)
   def process_records():
       ingest_from_kafka_and_save()
   ```
   
   The `ingest_from_kafka_and_save()` contains code that consumes from Kafka, 
providing a callback function to the consumption (which I suspect may have 
something to do with the problem, since it happens asynchronously).
   
   It's hard to reproduce since it is temperamental and happens every once in a 
while. 
   Audit Log does not show anything special - just seems the hang indefinitely.
   Consumption code itself works fine otherwise and it has been running for 
months in this and other dags that use it - but they also show the same 
behaviour.
   
   
   ### What you think should happen instead
   
   The `execution_timeout` should be enforced and the task should be killed so 
a new one could be placed.
   
   ### How to reproduce
   
   It is hard to reproduce, since it happens very unfrequently.
   
   * Create a dag with the definition in the "What happened" section
   * Add a function with a basic kafka consumption from a Kafka topic that 
consumes until end of topic partitions (or a max number of messages)
   * Leave it running and wait for the problem to happen
   
   ### Operating System
   
   MWAA on AWS
   
   ### Versions of Apache Airflow Providers
   
   --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt";
   apache-airflow-providers-amazon
   apache-airflow-providers-snowflake==4.0.2
   apache-airflow-providers-mysql==4.0.0
   apache-airflow-providers-slack
   confluent-kafka==2.1.0
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   Medium sized cluster
   2.5.1 version, latest update applied 2 weeks ago.
   
   ### Anything else
   
   Unclear what triggers the error - but whatever the error, the task should be 
killed to enforce the `execution_timeout`. 
   Seems like an internal thread management issue.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to