uranusjr commented on a change in pull request #17030:
URL: https://github.com/apache/airflow/pull/17030#discussion_r703175221



##########
File path: airflow/utils/db.py
##########
@@ -674,30 +673,73 @@ def check_conn_type_null(session=None) -> str:
         pass
 
     if n_nulls:
-        return (
+        yield (
             'The conn_type column in the connection '
             'table must contain content.\n'
             'Make sure you don\'t have null '
             'in the conn_type column.\n'
             f'Null conn_type conn_id: {list(n_nulls)}'
         )
-    return ''
+
+
+def check_task_tables_without_matching_dagruns(session) -> Iterable[str]:
+    from itertools import chain
+
+    import sqlalchemy.schema
+    from sqlalchemy import and_, outerjoin
+
+    from airflow.models.renderedtifields import RenderedTaskInstanceFields
+    from airflow.models.sensorinstance import SensorInstance
+
+    metadata = sqlalchemy.schema.MetaData(session.bind)
+    models_to_dagrun = [TaskInstance, TaskFail, TaskReschedule]
+    models_to_ti = [
+        RenderedTaskInstanceFields,
+        SensorInstance,
+        SlaMiss,
+        XCom,
+    ]
+    metadata.reflect(only=[model.__tablename__ for model in models_to_dagrun + 
models_to_ti])
+
+    for (model, target) in chain(
+        ((m, DagRun) for m in models_to_dagrun), ((m, TaskInstance) for m in 
models_to_ti)
+    ):
+        if 'run_id' in metadata.tables[model.__tablename__].columns:
+            # Migration already applied, don't check again
+            continue
+
+        join_cond = and_(model.dag_id == target.dag_id, model.execution_date 
== target.execution_date)
+        if "task_id" in target.__table__.columns:
+            join_cond = and_(join_cond, model.task_id == target.task_id)
+
+        query = (
+            session.query(model.dag_id, model.task_id, model.execution_date)
+            .select_from(outerjoin(model, target, join_cond))
+            .filter(target.dag_id.is_(None))
+        )  # type: ignore
+
+        num = query.count()
+
+        if num > 0:
+            yield (
+                f'The {model.__tablename__} table has {num} row{"s" if num != 
1 else ""} without a '
+                + f'corresponding {target.__tablename__} row. You must 
manually correct this problem '
+                + '(possibly by deleting the problem rows).'
+            )

Review comment:
       ```suggestion
               yield (
                   f'The {model.__tablename__} table has {num} row{"s" if num 
!= 1 else ""} without a '
                   f'corresponding {target.__tablename__} row. You must 
manually correct this problem '
                   '(possibly by deleting the problem rows).'
               )
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to