jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847691219
##
airflow/models/dagrun.py:
##
@@ -697,8 +697,9 @@ def _get_ready_tis(
old_states[schedulable.key] = old_state
continue
-# Expansion of last resort! This is ideally handled in the
mini-scheduler in LocalTaskJob, but if
-# for any reason it wasn't, we need to expand it now
+# This is called in two places: First (and ideally) is from the
mini scheduler at the end of
+# LocalTaskJob, and then as an "expansion of last resort" this is
also called from the scheduler
Review Comment:
```suggestion
# This is called in two places: First (and ideally) is in the
mini scheduler at the end of
# LocalTaskJob, and then as an "expansion of last resort" in the
scheduler
```
nit
##
airflow/models/dagrun.py:
##
@@ -838,14 +839,47 @@ def verify_integrity(self, session: Session =
NEW_SESSION):
ti.state = State.REMOVED
continue
-if task.is_mapped:
-task = cast("MappedOperator", task)
-num_mapped_tis = task.parse_time_mapped_ti_count
-# Check if the number of mapped literals has changed and we
need to mark this TI as removed
-if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+if not task.is_mapped:
+continue
+task = cast("MappedOperator", task)
+num_mapped_tis = task.parse_time_mapped_ti_count
+# Check if the number of mapped literals has changed and we need
to mark this TI as removed
+if num_mapped_tis is not None:
+if ti.map_index >= num_mapped_tis:
+self.log.debug(
+"Removing task '%s' as the map_index is longer than
the literal mapping list (%s)",
+ti,
+num_mapped_tis,
+)
ti.state = State.REMOVED
elif ti.map_index < 0:
+self.log.debug("Removing the unmapped TI '%s' as the
mapping can now be performed", ti)
ti.state = State.REMOVED
+else:
+self.log.info("Restoring mapped task '%s'", ti)
+Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+ti.state = State.NONE
+else:
+# What if it is _now_ dynamically mapped, but wasn't before?
+total_length = task.run_time_mapped_ti_count(self.run_id,
session=session)
+
+if total_length is None:
+# Not all upstreams finished, so we can't tell what should
be here. Remove everying
Review Comment:
```suggestion
# Not all upstreams finished, so we can't tell what
should be here. Remove everything.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org