jscheffl commented on code in PR #66674:
URL: https://github.com/apache/airflow/pull/66674#discussion_r3222431949


##########
providers/edge3/tests/unit/edge3/models/test_db.py:
##########
@@ -265,6 +265,51 @@ def 
test_initdb_stamps_and_upgrades_when_tables_exist_without_version(self, sess
         assert "concurrency" in columns
         assert "team_name" in columns
 
+    def 
test_upgradedb_stamps_and_upgrades_when_tables_exist_without_version(self, 
session):
+        """Test upgradedb runs incremental migrations when tables exist but 
alembic version table does not."""
+        from sqlalchemy import inspect, text
+
+        from airflow import settings
+        from airflow.providers.edge3.models.db import EdgeDBManager
+
+        manager = EdgeDBManager(session)
+
+        # Simulate pre-alembic state: tables exist but no version table and no 
concurrency column
+        with settings.engine.begin() as conn:
+            inspector = inspect(conn)
+            if inspector.has_table("alembic_version_edge3"):
+                conn.execute(text("DELETE FROM alembic_version_edge3"))
+            if "concurrency" in {col["name"] for col in 
inspector.get_columns("edge_worker")}:
+                from alembic.migration import MigrationContext
+                from alembic.operations import Operations
+
+                mc = MigrationContext.configure(conn, opts={"render_as_batch": 
True})
+                ops = Operations(mc)
+                ops.drop_column("edge_worker", "concurrency")
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_failed", sa.INTEGER(), 
autoincrement=False, default=0, nullable=False),
+                )
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False, 
default=0, nullable=False),
+                )
+                ops.add_column(
+                    "edge_worker",
+                    sa.Column("jobs_success", sa.INTEGER(), 
autoincrement=False, default=0, nullable=False),
+                )
+
+        # upgradedb() should detect tables exist, stamp to base, then upgrade
+        manager.upgradedb()
+
+        with settings.engine.connect() as conn:
+            version = conn.execute(text("SELECT version_num FROM 
alembic_version_edge3")).scalar()
+            columns = {col["name"] for col in 
inspect(conn).get_columns("edge_worker")}
+
+        assert version == "c6b3c3d093fd"

Review Comment:
   This is a bit danger-zone. Whenever we change schema in future we need to 
adjust test. Is there a way to test with an abstract or imported schema version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to