ephraimbuddy commented on code in PR #66788:
URL: https://github.com/apache/airflow/pull/66788#discussion_r3257248952
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -257,6 +257,21 @@ def _update_dag_owner_links(dag_owner_links: dict[str,
str], dm: DagModel, *, se
)
+# Dialect-specific message prefixes for unique-constraint IntegrityErrors.
Kept in sync with
+# ``airflow.api_fastapi.common.exceptions._UniqueConstraintErrorHandler``.
+_UNIQUE_CONSTRAINT_ERROR_PREFIXES = (
+ "UNIQUE constraint failed", # SQLite
+ "Duplicate entry", # MySQL
+ "violates unique constraint", # Postgres
Review Comment:
String-matching `exc.orig` across three drivers is fragile and tightly
couples this code to current driver wordings. Prefer fixing at the write site
with `INSERT ... ON CONFLICT DO NOTHING` (or equivalent merge for the
brand-new-dag path), so this `except` arm doesn't need to classify dialects at
all. If we keep this approach, please guard against `exc.orig is None`.
---
Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -295,6 +310,29 @@ def _serialize_dag_capturing_errors(
return []
except OperationalError:
raise
+ except IntegrityError as exc:
Review Comment:
Even if we keep the dialect-prefix approach for now, `log.info(...)` here
will fire on every collision and is going to be noisy on a healthy multi-DFP
cluster — and silently spammy if a real unique-constraint regression ever
lands. Suggest `log.warning` at minimum, and including the constraint name in
the message if SQLAlchemy exposes it on `exc.orig`.
---
Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting
##########
airflow-core/tests/unit/dag_processing/test_collection.py:
##########
@@ -651,6 +651,109 @@ def test_serialized_dag_errors_are_import_errors(
assert len(dag_import_error_listener.existing) == 0
assert dag_import_error_listener.new["abc.py"] ==
import_error.stacktrace
+ @pytest.mark.parametrize(
+ "orig_message",
+ [
+ pytest.param(
Review Comment:
This test asserts that the *handler* routes a mocked `IntegrityError`
correctly; it does not assert that two concurrent DFPs actually race or that
the surrounding `update_dag_parsing_results_in_db` cycle recovers. Please add a
`db_test` that races two sessions on the same brand-new Dag (e.g. two threads
calling `SerializedDagModel.write_dag` against the same `dag_id`), so we have a
regression test for the actual scenario, not just for the symptom string.
---
Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]