jiananyim opened a new issue, #40082:
URL: https://github.com/apache/airflow/issues/40082

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.1
   
   ### What happened?
   
   Hello,
   
   We recently encountered the following error, and we can confirm that the DAG 
viewing_station_grid_data_analysis only appears once in our setup.
   ```
   [2024-06-05T21:50:11.420+0000] {{logging_mixin.py:188}} INFO - 
[2024-06-05T21:50:11.419+0000] {{dagbag.py:647}} ERROR - Failed to write 
serialized DAG: /usr/local/airflow/dags/vjobs_dynamic_dag.py
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py",
 line 635, in _serialize_dag_capturing_errors
       dag_was_updated = SerializedDagModel.write_dag(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
 line 76, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/serialized_dag.py",
 line 157, in write_dag
       if session.scalar(
          ^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 1747, in scalar
       return self.execute(
              ^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 1716, in execute
       conn = self._connection_for_bind(bind)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 1555, in _connection_for_bind
       return self._transaction._connection_for_bind(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 724, in _connection_for_bind
       self._assert_active()
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 604, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates 
unique constraint "serialized_dag_pkey"
   DETAIL:  Key (dag_id)=(viewing_station_grid_data_analysis) already exists.
   
   [SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, 
data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, 
%(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, 
%(dag_hash)s, %(processor_subdir)s)]
   [SQL: INSERT INTO serialized_dag (dag_id, fileloc, fileloc_hash, data, 
data_compressed, last_updated, dag_hash, processor_subdir) VALUES (%(dag_id)s, 
%(fileloc)s, %(fileloc_hash)s, %(data)s, %(data_compressed)s, %(last_updated)s, 
%(dag_hash)s, %(processor_subdir)s)]
   [parameters: ({'dag_id': 'db_snapshots_valarm_hourly', 'fileloc': 
'/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 
'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": 
{"start_date": 1679702400.0, "_dag_id": "db_snapshots_valarm_hourly", 
"timetable": {"__type": "airflow.timetables.interval.Cr ... (104310 characters 
truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", "volumes": [], 
"volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], 
"params": {}}}', 'data_compressed': None, 'last_updated': 
datetime.datetime(2024, 6, 6, 1, 10, 11, 534883, tzinfo=Timezone('UTC')), 
'dag_hash': '5be21fc04823d4a5ed5b3733d21cabcd', 'processor_subdir': 
'/usr/local/airflow/dags'}, {'dag_id': 'enterprise_metrics_command_ingestion', 
'fileloc': '/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 
'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": 
{"start_date": 1686096000.0, "_dag_id": "enterprise_metrics_comman
 d_ingestion", "timetable": {"__type": "airflow.timetables.i ... (57998 
characters truncated) ... ': {\'cpu\': \'256m\', \'memory\': \'2Gi\'}}", 
"volumes": [], "volume_mounts": [], "cluster_context": "aws"}], 
"dag_dependencies": [], "params": {}}}', 'data_compressed': None, 
'last_updated': datetime.datetime(2024, 6, 6, 1, 10, 11, 856045, 
tzinfo=Timezone('UTC')), 'dag_hash': '296356c08d551d90c140c848369cfa92', 
'processor_subdir': '/usr/local/airflow/dags'}, {'dag_id': 
'salesforce_snapshot_delta', 'fileloc': 
'/usr/local/airflow/dags/dynamic_dags/vjobs_dynamic_dag_batch_7.py', 
'fileloc_hash': 71059898310842332, 'data': '{"__version": 1, "dag": 
{"start_date": 1693872000.0, "_dag_id": "salesforce_snapshot_delta", 
"timetable": {"__type": "airflow.timetables.interval.Cro ... (186206 characters 
truncated) ... "limit_memory": "16Gi", "limit_cpu": "2048m"}, "volumes": [], 
"volume_mounts": [], "cluster_context": "aws"}], "dag_dependencies": [], 
"params": {}}}', 'data_compressed': None
   ```
   
   We have 5 schedulers running. For DAG generation, we have a dynamically 
generated DAG file that can produce over a hundred DAGs with fixed names.
   
   It appears that these 5 schedulers are writing to the database 
simultaneously, causing contention. Therefore, I would like to understand the 
locking mechanism in Airflow and the writing mechanism for dynamic DAGs.
   
   1. How does Airflow handle DAGs generated from the same file during 
processing? Are they written to the database using a single connection, or are 
they written in separate batches?
   2. Is there any locking mechanism between schedulers, or should the database 
be configured to allow only one writing point?
   3. Do you have any other suggestions for handling such errors?
   
   
   Thank you very much!
   
   
   
   ### What you think should happen instead?
   
   Airflow should only process it without the duplicate key problem
   
   
   
   ### How to reproduce
   
   dynamic dag generation with a large number of dags.
   
   ### Operating System
   
   amazon mwaa
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to