ashb commented on code in PR #63185:
URL: https://github.com/apache/airflow/pull/63185#discussion_r2905507694
##########
airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py:
##########
@@ -43,63 +43,62 @@
def upgrade():
"""Make bundle_name not nullable."""
+ import contextlib
+
+ # We need the DagBundlesManager here to respect and validate the user
configured bundles instead of hardcoding 'dags-folder' and 'example_dags' as
the only two bundles.
+ from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+ user_config_bundles = DagBundlesManager().bundle_names
dialect_name = op.get_bind().dialect.name
- if dialect_name == "postgresql":
- op.execute(
- text("""
- INSERT INTO dag_bundle (name) VALUES
- ('example_dags'),
- ('dags-folder')
- ON CONFLICT (name) DO NOTHING;
- """)
- )
- if dialect_name == "mysql":
- op.execute(
- text("""
- INSERT IGNORE INTO dag_bundle (name) VALUES
- ('example_dags'),
- ('dags-folder');
- """)
- )
+ conn = op.get_bind()
+
+ exitstack = contextlib.ExitStack()
if dialect_name == "sqlite":
- op.execute(text("PRAGMA foreign_keys=OFF"))
- op.execute(
- text("""
- INSERT OR IGNORE INTO dag_bundle (name) VALUES
- ('example_dags'),
- ('dags-folder');
- """)
- )
+ # SQLite requires foreign key constraints to be disabled during batch
operations
+ conn = op.get_bind()
+ conn.execute(text("PRAGMA foreign_keys=OFF"))
+ exitstack.callback(conn.execute, text("PRAGMA foreign_keys=ON"))
- conn = op.get_bind()
- with ignore_sqlite_value_error(), op.batch_alter_table("dag", schema=None)
as batch_op:
- conn.execute(
- text(
- """
- UPDATE dag
- SET bundle_name =
- CASE
- WHEN fileloc LIKE '%/airflow/example_dags/%' THEN
'example_dags'
- ELSE 'dags-folder'
- END
- WHERE bundle_name IS NULL
- """
+ for bundle_name in user_config_bundles:
+ if dialect_name == "postgresql":
+ conn.execute(
+ text("INSERT INTO dag_bundle (name) VALUES (:name) ON CONFLICT
(name) DO NOTHING"),
+ {"name": bundle_name},
+ )
+ elif dialect_name == "mysql":
+ conn.execute(
+ text("INSERT IGNORE INTO dag_bundle (name) VALUES (:name)"),
+ {"name": bundle_name},
+ )
+ elif dialect_name == "sqlite":
+ conn.execute(
+ text("INSERT OR IGNORE INTO dag_bundle (name) VALUES (:name)"),
+ {"name": bundle_name},
)
- )
- # drop the foreign key temporarily and recreate it once both columns
are changed
- batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"),
type_="foreignkey")
- batch_op.alter_column("bundle_name", nullable=False,
existing_type=StringID())
- with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
- batch_op.alter_column("name", nullable=False, existing_type=StringID())
+ # Determine the fallback bundle for DAGs with NULL bundle_name.
+ # The default config always has 'dags-folder'; fall back to the first
configured bundle.
+ # This should be more correct than setting the dag_bundle_name as
'dags-folder' for all existing Dags.
+ default_bundle = user_config_bundles[0]
- with op.batch_alter_table("dag", schema=None) as batch_op:
- batch_op.create_foreign_key(
- batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"],
["name"]
- )
+ with exitstack:
+ with ignore_sqlite_value_error(), op.batch_alter_table("dag",
schema=None) as batch_op:
Review Comment:
Nit: put `ignore_sqlire_value_error` in the exitstack too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]