jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847691219


##########
airflow/models/dagrun.py:
##########
@@ -697,8 +697,9 @@ def _get_ready_tis(
                 old_states[schedulable.key] = old_state
                 continue
 
-            # Expansion of last resort! This is ideally handled in the 
mini-scheduler in LocalTaskJob, but if
-            # for any reason it wasn't, we need to expand it now
+            # This is called in two places: First (and ideally) is from the 
mini scheduler at the end of
+            # LocalTaskJob, and then as an "expansion of last resort" this is 
also called from the scheduler

Review Comment:
   ```suggestion
               # This is called in two places: First (and ideally) is in the 
mini scheduler at the end of
               # LocalTaskJob, and then as an "expansion of last resort" in the 
scheduler
   ```
   
   nit



##########
airflow/models/dagrun.py:
##########
@@ -838,14 +839,47 @@ def verify_integrity(self, session: Session = 
NEW_SESSION):
                     ti.state = State.REMOVED
                 continue
 
-            if task.is_mapped:
-                task = cast("MappedOperator", task)
-                num_mapped_tis = task.parse_time_mapped_ti_count
-                # Check if the number of mapped literals has changed and we 
need to mark this TI as removed
-                if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+            if not task.is_mapped:
+                continue
+            task = cast("MappedOperator", task)
+            num_mapped_tis = task.parse_time_mapped_ti_count
+            # Check if the number of mapped literals has changed and we need 
to mark this TI as removed
+            if num_mapped_tis is not None:
+                if ti.map_index >= num_mapped_tis:
+                    self.log.debug(
+                        "Removing task '%s' as the map_index is longer than 
the literal mapping list (%s)",
+                        ti,
+                        num_mapped_tis,
+                    )
                     ti.state = State.REMOVED
                 elif ti.map_index < 0:
+                    self.log.debug("Removing the unmapped TI '%s' as the 
mapping can now be performed", ti)
                     ti.state = State.REMOVED
+                else:
+                    self.log.info("Restoring mapped task '%s'", ti)
+                    Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+                    ti.state = State.NONE
+            else:
+                #  What if it is _now_ dynamically mapped, but wasn't before?
+                total_length = task.run_time_mapped_ti_count(self.run_id, 
session=session)
+
+                if total_length is None:
+                    # Not all upstreams finished, so we can't tell what should 
be here. Remove everying

Review Comment:
   ```suggestion
                       # Not all upstreams finished, so we can't tell what 
should be here. Remove everything.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to