1fanwang opened a new issue, #66786: URL: https://github.com/apache/airflow/issues/66786
### Apache Airflow version main (3.x) ### What happened? In `airflow/dag_processing/collection.py::_serialize_dag_capturing_errors` (called per-Dag from `update_dag_parsing_results_in_db`), the wrapping `try / except OperationalError: raise / except Exception` block around `SerializedDagModel.write_dag()` catches every non-`OperationalError` and records it as an import error. That includes the harmless `IntegrityError` raised when two Dag-processor workers race to insert the same brand-new Dag row. The loser's transaction is invalid at that point — any subsequent work on the same session emits `PendingRollbackError` — and the file ends up listed in `import_errors` for the rest of that parsing cycle, even though the winning processor's INSERT succeeded and the Dag is correctly serialized. From the user's perspective, the Dag file shows as broken in the UI for a parsing cycle despite being fine on disk and fully synced in the DB. ### What you think should happen instead? A unique-constraint `IntegrityError` raised by a concurrent peer that won the INSERT race is not an import error — the file has been fully synced by that peer. The handler should: 1. Roll the session back so subsequent per-Dag writes in the same `update_dag_parsing_results_in_db` call don't raise `PendingRollbackError`. 2. Return an empty import-error list for that Dag (the winner already recorded any real errors). 3. Not retry — the row exists, so the write is a no-op. Other `IntegrityError` causes (e.g. NOT-NULL violations from a genuinely malformed Dag) should still flow into the existing `Exception` arm and be recorded as import errors. Detecting the unique-constraint case is consistent with how `airflow/api_fastapi/common/exceptions.py::_UniqueConstraintErrorHandler` already does it — matching on the dialect-specific message prefix in `str(exc.orig)` (`UNIQUE constraint failed` for SQLite, `Duplicate entry` for MySQL, `violates unique constraint` for Postgres). ### How to reproduce Run two Dag-processors against a shared metadata DB pointing at the same `dag_folder`. Drop a brand-new Dag file. With timing such that both call `SerializedDagModel.write_dag()` in the same scheduler heartbeat: - One processor commits the INSERT into `serialized_dag` / `dag` / `dag_version`. - The other raises `IntegrityError` from the unique constraint. - The file appears in `import_errors` even though it is correctly serialized. The race is more common on MySQL/InnoDB under load; Postgres surfaces it less often because of different lock granularity, but the failure shape is the same. ### Operating System Linux ### Versions of Apache Airflow Providers n/a ### Deployment Other ### Deployment details Reproduced with multiple Dag-processors against the same metadata DB; not deployment-specific. ### Anything else? Happy to send a PR. The fix is localized to `_serialize_dag_capturing_errors`: add an `except IntegrityError` arm before the generic `Exception` arm that detects the unique-constraint case via the dialect-specific message, rolls back, and returns `[]`; non-unique `IntegrityError`s re-raise into the generic arm. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's Code of Conduct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
