anmolxlight commented on code in PR #66674:
URL: https://github.com/apache/airflow/pull/66674#discussion_r3222502354
##########
providers/apache/kafka/tests/unit/apache/kafka/triggers/test_msg_queue.py:
##########
Review Comment:
Fixed. Removed the unrelated Kafka test stabilization from this PR so the
diff is limited to the DB manager fix.
##########
airflow-core/src/airflow/utils/db_manager.py:
##########
Review Comment:
Fixed. Removed the provider and Kafka changes from this PR; the current diff
is now only the airflow-core DB manager change plus its unit test, so it can be
considered independently for backporting.
##########
providers/edge3/src/airflow/providers/edge3/models/db.py:
##########
@@ -61,6 +61,67 @@ class EdgeDBManager(BaseDBManager):
supports_table_dropping = True
revision_heads_map = _REVISION_HEADS_MAP
+ # TODO: Remove these compatibility overrides once the minimum supported
+ # Airflow version is 3.3, which includes the equivalent BaseDBManager
implementation.
Review Comment:
Fixed by splitting the scope here: provider compatibility changes were
removed from this PR. This PR now carries only the airflow-core fix intended
for backport consideration.
##########
providers/edge3/src/airflow/providers/edge3/models/db.py:
##########
@@ -61,6 +61,67 @@ class EdgeDBManager(BaseDBManager):
supports_table_dropping = True
revision_heads_map = _REVISION_HEADS_MAP
+ # TODO: Remove these compatibility overrides once the minimum supported
+ # Airflow version is 3.3, which includes the equivalent BaseDBManager
implementation.
+ def _has_existing_manager_tables(self) -> bool:
+ """Return whether any table managed by this DB manager already
exists."""
+ inspector = inspect(self.session.get_bind())
+ table_names_by_schema: dict[str | None, set[str]] = {}
+ for table in self.metadata.tables.values():
+ table_names_by_schema.setdefault(table.schema,
set()).add(table.name)
+
+ for schema, table_names in table_names_by_schema.items():
+ existing_table_names =
set(inspector.get_table_names(schema=schema))
+ if table_names.intersection(existing_table_names):
+ return True
+ return False
+
+ def _get_base_revision(self, config=None) -> str:
+ """Return the first/base Alembic revision for this DB manager."""
+ script = self.get_script_object(config)
+ for revision in script.walk_revisions():
+ if revision.down_revision is None:
+ return revision.revision
+ raise RuntimeError(f"No base revision found for
{self.__class__.__name__}")
+
+ def _stamp_base_revision(self, config) -> None:
+ """Stamp the database to this DB manager's base Alembic revision."""
+ from alembic import command
+
+ base_revision = self._get_base_revision(config)
+ self.log.info(
+ "%s tables already exist without an Alembic version; stamping base
revision %s before upgrade",
+ self.__class__.__name__,
+ base_revision,
+ )
+ command.stamp(config, base_revision)
+
+ def upgradedb(self, to_revision=None, from_revision=None,
show_sql_only=False, use_migration_files=False):
Review Comment:
No longer applicable in this PR. The provider compatibility overrides were
removed so the unclear TODO scope is gone here.
##########
providers/edge3/tests/unit/edge3/models/test_db.py:
##########
@@ -265,6 +265,51 @@ def
test_initdb_stamps_and_upgrades_when_tables_exist_without_version(self, sess
assert "concurrency" in columns
assert "team_name" in columns
+ def
test_upgradedb_stamps_and_upgrades_when_tables_exist_without_version(self,
session):
+ """Test upgradedb runs incremental migrations when tables exist but
alembic version table does not."""
+ from sqlalchemy import inspect, text
+
+ from airflow import settings
+ from airflow.providers.edge3.models.db import EdgeDBManager
+
+ manager = EdgeDBManager(session)
+
+ # Simulate pre-alembic state: tables exist but no version table and no
concurrency column
+ with settings.engine.begin() as conn:
+ inspector = inspect(conn)
+ if inspector.has_table("alembic_version_edge3"):
+ conn.execute(text("DELETE FROM alembic_version_edge3"))
+ if "concurrency" in {col["name"] for col in
inspector.get_columns("edge_worker")}:
+ from alembic.migration import MigrationContext
+ from alembic.operations import Operations
+
+ mc = MigrationContext.configure(conn, opts={"render_as_batch":
True})
+ ops = Operations(mc)
+ ops.drop_column("edge_worker", "concurrency")
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_failed", sa.INTEGER(),
autoincrement=False, default=0, nullable=False),
+ )
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_taken", sa.INTEGER(), autoincrement=False,
default=0, nullable=False),
+ )
+ ops.add_column(
+ "edge_worker",
+ sa.Column("jobs_success", sa.INTEGER(),
autoincrement=False, default=0, nullable=False),
+ )
+
+ # upgradedb() should detect tables exist, stamp to base, then upgrade
+ manager.upgradedb()
+
+ with settings.engine.connect() as conn:
+ version = conn.execute(text("SELECT version_num FROM
alembic_version_edge3")).scalar()
+ columns = {col["name"] for col in
inspect(conn).get_columns("edge_worker")}
+
+ assert version == "c6b3c3d093fd"
Review Comment:
No longer applicable in this PR. Removed the provider-specific edge3 test
from this PR while narrowing the change to airflow-core.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]