This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d478f939b48bb5df5207fe182ad5f77a87051234 Author: Ash Berlin-Taylor <ash_git...@firemirror.com> AuthorDate: Fri Nov 5 15:30:29 2021 +0000 Fix moving of dangling TaskInstance rows for SQL Server (#19425) SQL server uses a different syntax for creating a table from a select to the other DBs we support. And to make the "where_query" reusable across all DBs (SQL Server doesn't support `WHERE (col1,col2) IN ...`) the delete has been re-written too. (cherry picked from commit 3c45c12ed12a014978c0f1c9ca2fc3f32416b144) --- airflow/utils/db.py | 78 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 2da0722..2fcab15 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -709,10 +709,55 @@ def _format_dangling_error(source_table, target_table, invalid_count, reason): ) -def _move_dangling_run_data_to_new_table(session, source_table, target_table): +def _move_dangling_run_data_to_new_table(session, source_table: "Table", target_table_name: str): where_clause = "where dag_id is null or run_id is null or execution_date is null" - session.execute(text(f"create table {target_table} as select * from {source_table} {where_clause}")) - session.execute(text(f"delete from {source_table} {where_clause}")) + _move_dangling_table(session, source_table, target_table_name, where_clause) + + +def _move_dangling_table(session, source_table: "Table", target_table_name: str, where_clause: str): + dialect_name = session.get_bind().dialect.name + + delete_where = " AND ".join( + f"{source_table.name}.{c.name} = d.{c.name}" for c in source_table.primary_key.columns + ) + if dialect_name == "mssql": + session.execute( + text(f"select source.* into {target_table_name} from {source_table} as source {where_clause}") + ) + session.execute( + text( + f"delete from {source_table} from {source_table} join {target_table_name} AS d ON " + + delete_where + ) + ) + else: + # Postgres, MySQL and SQLite all have the same CREATE TABLE a AS SELECT ... syntax + session.execute( + text( + f"create table {target_table_name} as select source.* from {source_table} as source " + + where_clause + ) + ) + + # But different join-delete syntax. + if dialect_name == "mysql": + session.execute( + text( + f"delete {source_table} from {source_table} join {target_table_name} as d on " + + delete_where + ) + ) + elif dialect_name == "sqlite": + session.execute( + text( + f"delete from {source_table} where ROWID in (select {source_table}.ROWID from " + f"{source_table} as source join {target_table_name} as d on {delete_where})" + ) + ) + else: + session.execute( + text(f"delete from {source_table} using {target_table_name} as d where {delete_where}") + ) def check_run_id_null(session) -> Iterable[str]: @@ -720,7 +765,7 @@ def check_run_id_null(session) -> Iterable[str]: metadata = sqlalchemy.schema.MetaData(session.bind) try: - metadata.reflect(only=[DagRun.__tablename__]) + metadata.reflect(only=[DagRun.__tablename__], extend_existing=True, resolve_fks=False) except exc.InvalidRequestError: # Table doesn't exist -- empty db return @@ -745,21 +790,16 @@ def check_run_id_null(session) -> Iterable[str]: reason="with a NULL dag_id, run_id, or execution_date", ) return - _move_dangling_run_data_to_new_table(session, dagrun_table.name, dagrun_dangling_table_name) + _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name) -def _move_dangling_task_data_to_new_table(session, source_table, target_table): - where_clause = f""" - where (task_id, dag_id, execution_date) IN ( - select source.task_id, source.dag_id, source.execution_date - from {source_table} as source - left join dag_run as dr - on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date) - where dr.id is null - ) +def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str): + where_clause = """ + left join dag_run as dr + on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date) + where dr.id is null """ - session.execute(text(f"create table {target_table} as select * from {source_table} {where_clause}")) - session.execute(text(f"delete from {source_table} {where_clause}")) + _move_dangling_table(session, source_table, target_table_name, where_clause) def check_task_tables_without_matching_dagruns(session) -> Iterable[str]: @@ -770,7 +810,7 @@ def check_task_tables_without_matching_dagruns(session) -> Iterable[str]: models_to_dagrun = [TaskInstance, TaskReschedule] for model in models_to_dagrun + [DagRun]: try: - metadata.reflect(only=[model.__tablename__]) + metadata.reflect(only=[model.__tablename__], extend_existing=True, resolve_fks=False) except exc.InvalidRequestError: # Table doesn't exist, but try the other ones incase the user is upgrading from an _old_ DB # version @@ -821,7 +861,7 @@ def check_task_tables_without_matching_dagruns(session) -> Iterable[str]: ) errored = True continue - _move_dangling_task_data_to_new_table(session, source_table.name, dangling_table_name) + _move_dangling_task_data_to_new_table(session, source_table, dangling_table_name) if errored: session.rollback() @@ -842,6 +882,8 @@ def _check_migration_errors(session=None) -> Iterable[str]: check_task_tables_without_matching_dagruns, ): yield from check_fn(session) + # Ensure there is no "active" transaction. Seems odd, but without this MSSQL can hang + session.commit() @provide_session