jscheffl commented on code in PR #42913:
URL: https://github.com/apache/airflow/pull/42913#discussion_r1803495554


##########
airflow/models/dag_version.py:
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+import random
+import string
+from typing import TYPE_CHECKING
+
+from sqlalchemy import Column, ForeignKey, Integer, select
+from sqlalchemy.orm import relationship
+
+from airflow.models.base import Base, StringID
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+log = logging.getLogger(__name__)
+
+
+class DagVersion(Base):
+    """Model to track the versions of DAGs in the database."""
+
+    __tablename__ = "dag_version"
+    id = Column(Integer, primary_key=True)
+    version_number = Column(Integer)
+    version_name = Column(StringID())
+    dag_id = Column(StringID(), ForeignKey("dag.dag_id", ondelete="CASCADE"))
+    dag_model = relationship("DagModel", back_populates="dag_versions")
+    dag_code = relationship("DagCode", back_populates="dag_version", 
uselist=False)
+    serialized_dag = relationship("SerializedDagModel", 
back_populates="dag_version", uselist=False)
+    dag_runs = relationship("DagRun", back_populates="dag_version")
+    task_instances = relationship("TaskInstance", back_populates="dag_version")
+
+    def __init__(
+        self,
+        *,
+        dag_id: str,
+        version_number: int,
+        dag_code: DagCode,
+        serialized_dag: SerializedDagModel,
+        version_name: str | None = None,
+    ):
+        self.dag_id = dag_id
+        self.version_number = version_number
+        self.dag_code = dag_code
+        self.serialized_dag = serialized_dag
+        self.version_name = version_name
+
+    def __repr__(self):
+        return f"<DagVersion {self.dag_id} - {self.version_name}>"
+
+    @classmethod
+    def _generate_random_string(cls):
+        letters = string.ascii_letters + string.digits
+        return "dag-" + "".join(random.choice(letters) for i in range(10))
+
+    @classmethod
+    @provide_session
+    def _generate_unique_random_string(cls, session: Session = NEW_SESSION):
+        while True:
+            random_str = cls._generate_random_string()
+            # Check if the generated string exists
+            if not session.scalar(select(cls).where(cls.version_name == 
random_str)):
+                return random_str
+
+    @classmethod
+    @provide_session
+    def write_dag(
+        cls,
+        *,
+        dag_id: str,
+        dag_code: DagCode,
+        serialized_dag: SerializedDagModel,
+        version_name: str | None = None,
+        session: Session = NEW_SESSION,
+    ):
+        """Write a new DagVersion into database."""
+        existing_dag_version = session.scalar(
+            select(cls).where(cls.dag_id == 
dag_id).order_by(cls.version_number.desc()).limit(1)
+        )
+        version_number = 1
+
+        if existing_dag_version:
+            version_number = existing_dag_version.version_number + 1

Review Comment:
   This code smells to be non-thread safe. If multiple instances run 
concurrently it could generate a clash in app logic for ID generation. If this 
should represent an auto-increment... this should be rather pushed-down to 
database engine (e.g. identity column)



##########
airflow/models/dag.py:
##########
@@ -2463,7 +2471,7 @@ def create_dagrun(
         conf: dict | None = None,
         run_type: DagRunType | None = None,
         session: Session = NEW_SESSION,
-        dag_hash: str | None = None,
+        dag_version_id: int | None = None,

Review Comment:
   Are we sure that we want to have (only) an integer value for the DAG 
version? Who woul dbe responsible to increment?
   
   I am thinking about a backup/restore, how can we be sure durch such cases 
the IDs will not overwrite each other?
   
   Besides the question about `int`: If it is an integer then it is not really 
an ID, we should rather call it `dag_version` if it represents a numeric value 
(I think)



##########
airflow/migrations/versions/0039_3_0_0_add_dag_versioning.py:
##########
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+add dag versioning.
+
+Revision ID: 2b47dc6bc8df
+Revises: fb2d4922cd79
+Create Date: 2024-10-09 05:44:04.670984
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "2b47dc6bc8df"
+down_revision = "fb2d4922cd79"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+    """Apply add dag versioning."""
+    with op.batch_alter_table("dag", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("version_name", sa.String(length=250), 
nullable=True))
+
+    op.create_table(
+        "dag_version",
+        sa.Column("id", sa.Integer(), nullable=False),
+        sa.Column("version_number", sa.Integer(), nullable=True),
+        sa.Column("version_name", sa.String(length=250), nullable=True),
+        sa.Column("dag_id", sa.String(length=250), nullable=True),
+        sa.Column("dag_code_id", sa.Integer(), nullable=True),
+        sa.Column("serialized_dag_id", sa.Integer(), nullable=True),
+        sa.ForeignKeyConstraint(("dag_id",), ["dag.dag_id"], 
name=op.f("dag_version_dag_id_fkey")),
+        sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")),

Review Comment:
   Is the ID the only primary key w/o considering the dag_id contained in 
primary key? So the ID is a global ID irrespective of the DAG linked?
   Which code part/logic is responsible to auto-increment this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to