1fanwang opened a new issue, #66786:
URL: https://github.com/apache/airflow/issues/66786

   ### Apache Airflow version
   
   main (3.x)
   
   ### What happened?
   
   In `airflow/dag_processing/collection.py::_serialize_dag_capturing_errors` 
(called per-Dag from `update_dag_parsing_results_in_db`), the wrapping `try / 
except OperationalError: raise / except Exception` block around 
`SerializedDagModel.write_dag()` catches every non-`OperationalError` and 
records it as an import error. That includes the harmless `IntegrityError` 
raised when two Dag-processor workers race to insert the same brand-new Dag row.
   
   The loser's transaction is invalid at that point — any subsequent work on 
the same session emits `PendingRollbackError` — and the file ends up listed in 
`import_errors` for the rest of that parsing cycle, even though the winning 
processor's INSERT succeeded and the Dag is correctly serialized. From the 
user's perspective, the Dag file shows as broken in the UI for a parsing cycle 
despite being fine on disk and fully synced in the DB.
   
   ### What you think should happen instead?
   
   A unique-constraint `IntegrityError` raised by a concurrent peer that won 
the INSERT race is not an import error — the file has been fully synced by that 
peer. The handler should:
   
   1. Roll the session back so subsequent per-Dag writes in the same 
`update_dag_parsing_results_in_db` call don't raise `PendingRollbackError`.
   2. Return an empty import-error list for that Dag (the winner already 
recorded any real errors).
   3. Not retry — the row exists, so the write is a no-op.
   
   Other `IntegrityError` causes (e.g. NOT-NULL violations from a genuinely 
malformed Dag) should still flow into the existing `Exception` arm and be 
recorded as import errors.
   
   Detecting the unique-constraint case is consistent with how 
`airflow/api_fastapi/common/exceptions.py::_UniqueConstraintErrorHandler` 
already does it — matching on the dialect-specific message prefix in 
`str(exc.orig)` (`UNIQUE constraint failed` for SQLite, `Duplicate entry` for 
MySQL, `violates unique constraint` for Postgres).
   
   ### How to reproduce
   
   Run two Dag-processors against a shared metadata DB pointing at the same 
`dag_folder`. Drop a brand-new Dag file. With timing such that both call 
`SerializedDagModel.write_dag()` in the same scheduler heartbeat:
   
   - One processor commits the INSERT into `serialized_dag` / `dag` / 
`dag_version`.
   - The other raises `IntegrityError` from the unique constraint.
   - The file appears in `import_errors` even though it is correctly serialized.
   
   The race is more common on MySQL/InnoDB under load; Postgres surfaces it 
less often because of different lock granularity, but the failure shape is the 
same.
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Reproduced with multiple Dag-processors against the same metadata DB; not 
deployment-specific.
   
   ### Anything else?
   
   Happy to send a PR. The fix is localized to 
`_serialize_dag_capturing_errors`: add an `except IntegrityError` arm before 
the generic `Exception` arm that detects the unique-constraint case via the 
dialect-specific message, rolls back, and returns `[]`; non-unique 
`IntegrityError`s re-raise into the generic arm.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's Code of Conduct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to