jedcunningham commented on code in PR #22909: URL: https://github.com/apache/airflow/pull/22909#discussion_r847691219
########## airflow/models/dagrun.py: ########## @@ -697,8 +697,9 @@ def _get_ready_tis( old_states[schedulable.key] = old_state continue - # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if - # for any reason it wasn't, we need to expand it now + # This is called in two places: First (and ideally) is from the mini scheduler at the end of + # LocalTaskJob, and then as an "expansion of last resort" this is also called from the scheduler Review Comment: ```suggestion # This is called in two places: First (and ideally) is in the mini scheduler at the end of # LocalTaskJob, and then as an "expansion of last resort" in the scheduler ``` nit ########## airflow/models/dagrun.py: ########## @@ -838,14 +839,47 @@ def verify_integrity(self, session: Session = NEW_SESSION): ti.state = State.REMOVED continue - if task.is_mapped: - task = cast("MappedOperator", task) - num_mapped_tis = task.parse_time_mapped_ti_count - # Check if the number of mapped literals has changed and we need to mark this TI as removed - if not num_mapped_tis or ti.map_index >= num_mapped_tis: + if not task.is_mapped: + continue + task = cast("MappedOperator", task) + num_mapped_tis = task.parse_time_mapped_ti_count + # Check if the number of mapped literals has changed and we need to mark this TI as removed + if num_mapped_tis is not None: + if ti.map_index >= num_mapped_tis: + self.log.debug( + "Removing task '%s' as the map_index is longer than the literal mapping list (%s)", + ti, + num_mapped_tis, + ) ti.state = State.REMOVED elif ti.map_index < 0: + self.log.debug("Removing the unmapped TI '%s' as the mapping can now be performed", ti) ti.state = State.REMOVED + else: + self.log.info("Restoring mapped task '%s'", ti) + Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1) + ti.state = State.NONE + else: + # What if it is _now_ dynamically mapped, but wasn't before? + total_length = task.run_time_mapped_ti_count(self.run_id, session=session) + + if total_length is None: + # Not all upstreams finished, so we can't tell what should be here. Remove everying Review Comment: ```suggestion # Not all upstreams finished, so we can't tell what should be here. Remove everything. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org