AmosG opened a new issue, #59166:
URL: https://github.com/apache/airflow/issues/59166

   # DAG Processor Crashes on MySQL Connection Failure During Import Error 
Recording
   
   ## Description
   
   The DAG processor crashes and enters restart loops when MySQL connection 
fails while recording DAG import errors to the database. This is due to missing 
session cleanup (`session.rollback()`) after caught exceptions, leaving the 
SQLAlchemy session in an invalid state.
   
   ## Apache Airflow Version
   
   **Affected Version:** 3.1.3 (likely affects 3.0.x and 3.1.x series)
   
   ## Environment
   
   - **Deployment:** Kubernetes, Docker Compose
   - **Database:** MySQL 8.0
   - **Executor:** KubernetesExecutor, LocalExecutor
   
   ## Problem Description
   
   ### Expected Behavior
   
   When a DAG has import errors (e.g., `ModuleNotFoundError`):
   1. Import error should be caught during parsing
   2. Error should be recorded to the database
   3. If database operation fails, it should be logged or retried
   4. DAG processor should continue processing other DAGs
   5. Import errors should eventually appear in the Airflow UI
   
   ### Actual Behavior
   
   When MySQL connection fails during import error recording:
   1. Import error is caught during parsing ✓
   2. Database operation fails (connection timeout, pool exhaustion, network 
issue) ✗
   3. Exception is caught but **session is NOT rolled back** ✗
   4. `session.flush()` attempts to flush the invalid session ✗
   5. New exception is raised (`OperationalError` or `PendingRollbackError`) ✗
   6. **DAG processor process crashes with exit code 1** ✗
   7. In production with restart policy: **continuous restart loop** ✗
   
   ### Production Impact
   
   - DAG processor restarted **1,259 times in 4 days** (~13 restarts/hour)
   - Connection pool exhaustion
   - Cascading failures across Airflow components
   - Import errors not visible in UI
   - System instability
   
   ## Root Cause
   
   **File:** `airflow-core/src/airflow/dag_processing/collection.py`  
   **Function:** `update_dag_parsing_results_in_db()`  
   **Lines:** ~430-447
   
   ```python
   # Current problematic code:
   try:
       _update_import_errors(...)
   except Exception:
       log.exception("Error logging import errors!")
       # ❌ MISSING: session.rollback()
   
   try:
       _update_dag_warnings(...)
   except Exception:
       log.exception("Error logging DAG warnings.")
       # ❌ MISSING: session.rollback()
   
   session.flush()  # ❌ NO ERROR HANDLING - crashes if session invalid
   ```
   
   **The Issue:**
   1. MySQL connection fails during `_update_import_errors()`
   2. Exception is caught but `session.rollback()` is NOT called
   3. Session remains in invalid transaction state
   4. `session.flush()` attempts to flush invalid session
   5. New exception (`OperationalError`/`PendingRollbackError`) is raised
   6. This exception is NOT caught → process crashes
   
   ## Proposed Solution
   
   Add `session.rollback()` calls after caught exceptions and wrap 
`session.flush()` in error handling:
   
   ```python
   # Fixed code:
   try:
       _update_import_errors(...)
   except Exception:
       log.exception("Error logging import errors!")
       session.rollback()  # ✅ FIX: Clean up invalid session state
   
   try:
       _update_dag_warnings(...)
   except Exception:
       log.exception("Error logging DAG warnings.")
       session.rollback()  # ✅ FIX: Clean up invalid session state
   
   try:
       session.flush()  # ✅ FIX: Wrapped in error handling
   except Exception:
       log.exception("Error flushing session after parsing results")
       session.rollback()  # ✅ FIX: Don't crash - continue processing
   ```
   
   ## Error Messages
   
   ### Lost Connection (Transient)
   ```
   sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) 
   (2013, 'Lost connection to MySQL server during query')
   ```
   
   ### Connection Refused (MySQL Down)
   ```
   sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) 
   (2003, "Can't connect to MySQL server on 'mysql'")
   ```
   
   ### Invalid Session State (After Caught Exception)
   ```
   sqlalchemy.exc.PendingRollbackError: 
   Can't reconnect until invalid transaction is rolled back.
   Please rollback() fully before proceeding
   ```
   
   All these errors lead to crashes due to missing error handling in 
`session.flush()`.
   
   ## Workarounds (Temporary)
   
   While these reduce frequency, they don't prevent crashes:
   - Increase connection pool settings
   - Reduce DAG processor resource limits
   - Increase liveness probe timeout
   - Monitor and alert on high restart counts
   
   **None of these are real solutions** - the code needs proper session cleanup.
   
   ## Why This is Important
   
   This violates distributed systems best practices:
   - Network hiccups are **normal, expected events** in distributed systems
   - Database timeouts happen under load
   - Systems should **retry transient failures** automatically
   - Code must handle recoverable errors gracefully
   
   Other Airflow components (Scheduler, Triggerer) handle database failures 
properly - only the DAG processor lacks proper error recovery for this code 
path.
   
   ## Benefits of Fix
   
   1. **Graceful degradation:** DAG processor continues running during database 
issues
   2. **Automatic recovery:** Import errors saved when connection recovers
   3. **No restart loops:** Eliminates cascading failures
   4. **Better resilience:** Handles network hiccups and transient errors
   5. **Production stability:** Prevents connection pool exhaustion from 
restart storms
   
   ## Additional Context
   
   - Reproduced in both Docker Compose and Kubernetes environments
   - 100% reproducible when DAGs have import errors and MySQL connection fails
   - Configuration tuning (pool size, timeouts) does not address the root cause
   - The fix is straightforward and consistent with error handling elsewhere in 
Airflow
   
   ## Are you willing to submit PR?
   
   Yes, I have already prepared the fix with comprehensive unit tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to