1fanwang commented on code in PR #66788:
URL: https://github.com/apache/airflow/pull/66788#discussion_r3232439118


##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors(
         return []
     except OperationalError:
         raise
+    except IntegrityError as exc:
+        # Multiple Dag processors writing the same brand-new Dag can race on 
the INSERT.
+        # The loser's transaction is already invalid, so we must roll the 
session back to
+        # avoid PendingRollbackError on subsequent per-Dag work in this 
parsing cycle.
+        # The winning peer already produced the correct row, so this is not an 
import error
+        # and we don't retry. Non-unique IntegrityErrors (e.g. NOT-NULL 
violations from a
+        # genuinely malformed Dag) fall through to the generic Exception arm.

Review Comment:
   The trace that lands in `import_errors` on each dialect, captured by 
reverting the new `except IntegrityError` arm and rerunning the regression test 
against `main`:
   
   ```
   sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint 
failed: serialized_dag.dag_id
   sqlalchemy.exc.IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate 
entry 'my_dag' for key 'serialized_dag.PRIMARY'")
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "serialized_dag_pkey"
   ```
   
   The IntegrityError raised by the losing processor's 
`SerializedDagModel.write_dag` is caught by the existing generic `except 
Exception` arm in `_serialize_dag_capturing_errors`, fed through 
`traceback.format_exc(...)`, and recorded as the import-error value for the 
parsing cycle. The loser's now-invalid transaction also causes 
`PendingRollbackError` on the next per-Dag write in the same 
`update_dag_parsing_results_in_db` call. Added a before/after pytest snippet to 
the PR body that surfaces each dialect's exact message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to