fuechter opened a new issue, #30636: URL: https://github.com/apache/airflow/issues/30636
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Version Airflow: 2.5.1 I have a file in python that generate dynamic DAG, and all time when have a new code in that file, is necessary to execute `./airflow.sh dags reserialize`, but sometimes this command return with an error like below, but : ``` [2023-04-13 19:46:54,669] {dag.py:2690} INFO - Sync 35 DAGs Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 8, in <module> sys.exit(main()) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 39, in main args.func(args) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 52, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 75, in wrapper return func(*args, session=session, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 108, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/dag_command.py", line 492, in dag_reserialize dagbag.sync_to_db(session=session) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 645, in sync_to_db for attempt in run_with_db_retries(logger=self.log): File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 384, in __iter__ do = self.iter(retry_state=retry_state) File "/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 351, in iter return fut.result() File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in result return self.__get_result() File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 660, in sync_to_db self.dags.values(), processor_subdir=processor_subdir, session=session File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 2701, in bulk_write_to_db orm_dags: list[DagModel] = with_row_locks(query, of=DagModel, session=session).all() File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2773, in all return self._iter().all() File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2919, in _iter execution_options={"_sa_orm_load_options": self.load_options}, File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1713, in execute conn = self._connection_for_bind(bind) File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1553, in _connection_for_bind 2-tetris-linkedin-hourly-days', 'fileloc': '/opt/airflow/dags/register_dynamic_dag.py', 'fileloc_hash': 51105027638417678, 'data': '{"__version": 1, "dag": {"_dag_id": "96ddcc3b-900a-44a7-bda9-81b9eeserialized DAG: /opt/airflow/dags/register_alerts_resport_dag.py engine, execution_options python3.7/site-packages/airflow/models/dagbag.py", line 631, in _serialize_dag_capturing_errors File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind self._assert_active() File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 608, in _assert_active code="7s2a", sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey" DETAIL: Key (dag_id)=(96ddcc3b-900a-44a7-bda9-81b9eefde4d2-dynamic-dag-hourly-days) already exists. ``` ### What you think should happen instead I think that airflow should only reserialize without problem with duplicate key. ### How to reproduce The python's file that register dynamic dag (I shortened code to focus on the problem): Here, the data is fixed as dict on python, but in real scene is a json file, that update sometime for other DAG. ``` import pytz from datetime import datetime, timedelta from airflow.decorators import dag, task data = { 'companies': [{ 'id': '1233', 'slug': 'company-1', 'timezone': { 'id': 'America/Sao_Paulo' }, 'connections': [{ 'id': 'facebook', }, { 'id': 'instagram', }] }, { 'id': '1234', 'slug': 'company-2', 'timezone': { 'id': 'America/Sao_Paulo' }, 'connections': [{ 'id': 'facebook', }, { 'id': 'instagram', }, { 'id': 'twitter', }] }] } def create_dynamic_dag(dag_id, company): schedule_interval = "* 5 * * * *" local_tz = pytz.timezone(company['timezone']['id']) start_date = datetime.now(local_tz) - timedelta(days=7, hours=0) @dag( dag_id=dag_id, schedule=schedule_interval, start_date=start_date, catchup=False, max_active_runs=1, ) def dynamic_dag(): @task.branch def ping(): # ...code to ping # return 'done' or 'notify_error' pass @task def notify_error(): # ...code to notify error pass @task def done(): # ... code to done pass ping_instance = ping() notify_error_instance = notify_error() done_instance = done() ping_instance >> [done_instance, notify_error_instance] dagD = dynamic_dag() globals()[dag_id] = dagD return dagD for company in data['companies']: for connection in company['connections']: dag_id = f"{company['id']}-{company['slug']}-{connection['id']}" dagD = create_dynamic_dag(dag_id, company) ``` ### Operating System Mac, but I'm using docker from airflow ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details I only follow the documentation of [Airflow in docker](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html). ### Anything else Sometime this errors doesn't happen, but it's happens with more frequency. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org