mik-laj commented on a change in pull request #7489: [AIRFLOW-6869][WIP] Bulk fetch DAGRuns for _process_task_instances URL: https://github.com/apache/airflow/pull/7489#discussion_r383451630
########## File path: airflow/jobs/scheduler_job.py ########## @@ -684,11 +687,36 @@ def _process_dags(self, dagbag, dags, tis_out): :type dagbag: airflow.models.DagBag :param dags: the DAGs from the DagBag to process :type dags: List[airflow.models.DAG] - :param tis_out: A list to add generated TaskInstance objects - :type tis_out: list[TaskInstance] - :rtype: None + :rtype: list[TaskInstance] + :return: A list of generated TaskInstance objects """ check_slas = conf.getboolean('core', 'CHECK_SLAS', fallback=True) + + tis_out = [] + dag_ids = [dag.dag_id for dag in dags] + dag_runs = DagRun.find(dag_ids=dag_ids, state=State.RUNNING, session=session) + # list() is needed because of a bug in Python 3.7+ + # + # The following code returns different values depending on the Python version + # from itertools import groupby + # from unittest.mock import MagicMock + # key = "key" + # item = MagicMock(attr=key) + # items = [item] + # items_by_attr = {k: v for k, v in groupby(items, lambda d: d.attr)} + # print("items_by_attr=", items_by_attr) + # item_with_key = list(items_by_attr[key]) if key in items_by_attr else [] + # print("item_with_key=", item_with_key) + # + # Python 3.7+: + # items_by_attr= {'key': <itertools._grouper object at 0x7f3b9f38d4d0>} + # item_with_key= [] + # + # Python 3.6: + # items_by_attr= {'key': <itertools._grouper object at 0x101128630>} + # item_with_key= [<MagicMock id='4310405416'>] Review comment: I will update the description during the next rebase. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services