ephraimbuddy commented on code in PR #66788:
URL: https://github.com/apache/airflow/pull/66788#discussion_r3257248952


##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -257,6 +257,21 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+# Dialect-specific message prefixes for unique-constraint IntegrityErrors. 
Kept in sync with
+# ``airflow.api_fastapi.common.exceptions._UniqueConstraintErrorHandler``.
+_UNIQUE_CONSTRAINT_ERROR_PREFIXES = (
+    "UNIQUE constraint failed",  # SQLite
+    "Duplicate entry",  # MySQL
+    "violates unique constraint",  # Postgres

Review Comment:
   String-matching `exc.orig` across three drivers is fragile and tightly 
couples this code to current driver wordings. Prefer fixing at the write site 
with `INSERT ... ON CONFLICT DO NOTHING` (or equivalent merge for the 
brand-new-dag path), so this `except` arm doesn't need to classify dialects at 
all. If we keep this approach, please guard against `exc.orig is None`.
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting



##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors(
         return []
     except OperationalError:
         raise
+    except IntegrityError as exc:

Review Comment:
   Even if we keep the dialect-prefix approach for now, `log.info(...)` here 
will fire on every collision and is going to be noisy on a healthy multi-DFP 
cluster — and silently spammy if a real unique-constraint regression ever 
lands. Suggest `log.warning` at minimum, and including the constraint name in 
the message if SQLAlchemy exposes it on `exc.orig`.
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting



##########
airflow-core/tests/unit/dag_processing/test_collection.py:
##########
@@ -651,6 +651,109 @@ def test_serialized_dag_errors_are_import_errors(
         assert len(dag_import_error_listener.existing) == 0
         assert dag_import_error_listener.new["abc.py"] == 
import_error.stacktrace
 
+    @pytest.mark.parametrize(
+        "orig_message",
+        [
+            pytest.param(

Review Comment:
   This test asserts that the *handler* routes a mocked `IntegrityError` 
correctly; it does not assert that two concurrent DFPs actually race or that 
the surrounding `update_dag_parsing_results_in_db` cycle recovers. Please add a 
`db_test` that races two sessions on the same brand-new Dag (e.g. two threads 
calling `SerializedDagModel.write_dag` against the same `dag_id`), so we have a 
regression test for the actual scenario, not just for the symptom string.
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to