uranusjr commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3425201903
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -507,13 +543,18 @@ def _queue_dagruns(
# mapped) tasks update the same asset, this can fail with a unique
# constraint violation.
#
- # If we support it, use ON CONFLICT to do nothing, otherwise
+ # If we support it, use ON CONFLICT to update, otherwise
# "fallback" to running this in a nested transaction. This is needed
# so that the adding of these rows happens in the same transaction
# where `ti.state` is changed.
- if get_dialect_name(session) == "postgresql":
- return cls._queue_dagruns_nonpartitioned_postgres(asset_id,
non_partitioned_dags, session)
- return cls._queue_dagruns_nonpartitioned_slow_path(asset_id,
non_partitioned_dags, session)
+ dialect_name = get_dialect_name(session)
+ if dialect_name in ("postgresql", "sqlite"):
+ return cls._queue_dagruns_nonpartitioned_conflict_update(
+ asset_id, non_partitioned_dags, event, session, dialect_name
+ )
+ if dialect_name == "mysql":
+ return cls._queue_dagruns_nonpartitioned_mysql(asset_id,
non_partitioned_dags, event, session)
+ return cls._queue_dagruns_nonpartitioned_slow_path(asset_id,
non_partitioned_dags, event, session)
Review Comment:
I don’t think this would ever be reached in practice since Airflow only
supports three database backends. Let’s change the branches to
```python
if dialect_name == "mysql":
# MySQL
# Others, use conflict update.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]