jiananyim opened a new issue, #40082: URL: https://github.com/apache/airflow/issues/40082
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.8.1 ### What happened? Hello, We recently encountered the following error, and we can confirm that the DAG viewing_station_grid_data_analysis only appears once in our setup. ``` [2024-06-05T21:50:11.420+0000] {{logging_mixin.py:188}} INFO - [2024-06-05T21:50:11.419+0000] {{dagbag.py:647}} ERROR - Failed to write serialized DAG: /usr/local/airflow/dags/vjobs_dynamic_dag.py Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 635, in _serialize_dag_capturing_errors dag_was_updated = SerializedDagModel.write_dag( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/serialized_dag.py", line 157, in write_dag if session.scalar( ^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1747, in scalar return self.execute( ^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1716, in execute conn = self._connection_for_bind(bind) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1555, in _connection_for_bind return self._transaction._connection_for_bind( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 724, in _connection_for_bind self._assert_active() File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active raise sa_exc.PendingRollbackError( sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "serialized_dag_pkey" DETAIL: Key (dag_id)=(viewing_station_grid_data_analysis) already exists. [SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, %(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, %(dag_hash)s, %(processor_subdir)s)] [SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, %(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, %(dag_hash)s, %(processor_subdir)s)] [parameters: ({'dag_id': 'db_snapshots_valarm_hourly', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1679702400.0, "_dag_id": "db_snapshots_valarm_hourly", "timetable": {"__type": "airflow.timetables.interval.Cr ... (104310 characters truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None, 'last_updated': datetime.datetime(2024, 6, 6, 1, 10, 11, 534883, tzinfo=Timezone('UTC')), 'dag_hash': '5be21fc04823d4a5ed5b3733d21cabcd', 'processor_subdir': '/usr/local/airflow/dags'}, {'dag_id': 'enterprise_metrics_command_ingestion', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1686096000.0, "_dag_id": "enterprise_metrics_comman d_ingestion", "timetable": {"__type": "airflow.timetables.i ... (57998 characters truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None, 'last_updated': datetime.datetime(2024, 6, 6, 1, 10, 11, 856045, tzinfo=Timezone('UTC')), 'dag_hash': '296356c08d551d90c140c848369cfa92', 'processor_subdir': '/usr/local/airflow/dags'}, {'dag_id': 'salesforce_snapshot_delta', 'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": {"start_date": 1693872000.0, "_dag_id": "salesforce_snapshot_delta", "timetable": {"__type": "airflow.timetables.interval.Cro ... (186206 characters truncated) ... "limit_memory": "16Gi", "limit_cpu": "2048m"}, "volumes": [], "volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], "params": {}}}', 'data_compressed': None ``` We have 5 schedulers running. For DAG generation, we have a dynamically generated DAG file that can produce over a hundred DAGs with fixed names. It appears that these 5 schedulers are writing to the database simultaneously, causing contention. Therefore, I would like to understand the locking mechanism in Airflow and the writing mechanism for dynamic DAGs. 1. How does Airflow handle DAGs generated from the same file during processing? Are they written to the database using a single connection, or are they written in separate batches? 2. Is there any locking mechanism between schedulers, or should the database be configured to allow only one writing point? 3. Do you have any other suggestions for handling such errors? Thank you very much! ### What you think should happen instead? Airflow should only process it without the duplicate key problem ### How to reproduce dynamic dag generation with a large number of dags. ### Operating System amazon mwaa ### Versions of Apache Airflow Providers _No response_ ### Deployment Amazon (AWS) MWAA ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org