uranusjr commented on a change in pull request #17030: URL: https://github.com/apache/airflow/pull/17030#discussion_r703175221
########## File path: airflow/utils/db.py ########## @@ -674,30 +673,73 @@ def check_conn_type_null(session=None) -> str: pass if n_nulls: - return ( + yield ( 'The conn_type column in the connection ' 'table must contain content.\n' 'Make sure you don\'t have null ' 'in the conn_type column.\n' f'Null conn_type conn_id: {list(n_nulls)}' ) - return '' + + +def check_task_tables_without_matching_dagruns(session) -> Iterable[str]: + from itertools import chain + + import sqlalchemy.schema + from sqlalchemy import and_, outerjoin + + from airflow.models.renderedtifields import RenderedTaskInstanceFields + from airflow.models.sensorinstance import SensorInstance + + metadata = sqlalchemy.schema.MetaData(session.bind) + models_to_dagrun = [TaskInstance, TaskFail, TaskReschedule] + models_to_ti = [ + RenderedTaskInstanceFields, + SensorInstance, + SlaMiss, + XCom, + ] + metadata.reflect(only=[model.__tablename__ for model in models_to_dagrun + models_to_ti]) + + for (model, target) in chain( + ((m, DagRun) for m in models_to_dagrun), ((m, TaskInstance) for m in models_to_ti) + ): + if 'run_id' in metadata.tables[model.__tablename__].columns: + # Migration already applied, don't check again + continue + + join_cond = and_(model.dag_id == target.dag_id, model.execution_date == target.execution_date) + if "task_id" in target.__table__.columns: + join_cond = and_(join_cond, model.task_id == target.task_id) + + query = ( + session.query(model.dag_id, model.task_id, model.execution_date) + .select_from(outerjoin(model, target, join_cond)) + .filter(target.dag_id.is_(None)) + ) # type: ignore + + num = query.count() + + if num > 0: + yield ( + f'The {model.__tablename__} table has {num} row{"s" if num != 1 else ""} without a ' + + f'corresponding {target.__tablename__} row. You must manually correct this problem ' + + '(possibly by deleting the problem rows).' + ) Review comment: ```suggestion yield ( f'The {model.__tablename__} table has {num} row{"s" if num != 1 else ""} without a ' f'corresponding {target.__tablename__} row. You must manually correct this problem ' '(possibly by deleting the problem rows).' ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org