ashb commented on code in PR #27506:
URL: https://github.com/apache/airflow/pull/27506#discussion_r1017746397
##########
airflow/models/mappedoperator.py:
##########
@@ -620,13 +620,19 @@ def expand_mapped_task(self, run_id: str, *, session:
Session) -> tuple[Sequence
try:
total_length =
self._get_specified_expand_input().get_total_map_length(run_id, session=session)
except NotFullyPopulated as e:
- self.log.info(
- "Cannot expand %r for run %s; missing upstream values: %s",
- self,
- run_id,
- sorted(e.missing),
- )
- total_length = None
+ if self.dag and self.dag.partial:
+ # partial dags comes from the mini scheduler. It's
+ # likely that the upstream tasks are not yet done,
+ # so we ignore this exception.
+ total_length = None
+ else:
+ self.log.error(
+ "Cannot expand %r for run %s; missing upstream values: %s",
+ self,
+ run_id,
+ sorted(e.missing),
+ )
+ total_length = None
Review Comment:
```suggestion
total_length = None
# partial dags comes from the mini scheduler. It's
# possible that the upstream tasks are not yet done,
# but we don't have upstream of upstreams in partial dags,
# so we ignore this exception.
if not self.dag or not self.dag.partial:
self.log.error(
"Cannot expand %r for run %s; missing upstream values:
%s",
self,
run_id,
sorted(e.missing),
)
```
##########
airflow/models/taskinstance.py:
##########
@@ -2459,6 +2459,65 @@ def ti_selector_condition(cls, vals: Collection[str |
tuple[str, int]]) -> Colum
return filters[0]
return or_(*filters)
+ @Sentry.enrich_errors
+ @provide_session
+ def schedule_downstream_tasks(self, session=None):
+ """
+ The mini-scheduler for scheduling downstream tasks of this task
instance
+ :meta: private
+ """
+ from sqlalchemy.exc import OperationalError
+
+ from airflow.models import DagRun
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
+
+ task = self.task
+ if TYPE_CHECKING:
+ assert task.dag
+
+ # Get a partial DAG with just the specific tasks we want to
examine.
+ # In order for dep checks to work correctly, we include ourself (so
+ # TriggerRuleDep can check the state of the task we just executed).
+ partial_dag = task.dag.partial_subset(
+ task.downstream_task_ids,
+ include_downstream=True,
+ include_upstream=False,
+ include_direct_upstream=True,
+ )
+
+ dag_run.dag = partial_dag
+ info = dag_run.task_instance_scheduling_decisions(session)
+
+ skippable_task_ids = {
+ task_id for task_id in partial_dag.task_ids if task_id not in
task.downstream_task_ids
+ }
+
+ schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id
not in skippable_task_ids]
+ for schedulable_ti in schedulable_tis:
+ if not hasattr(schedulable_ti, "task"):
+ schedulable_ti.task =
task.dag.get_task(schedulable_ti.task_id)
+
+ num = dag_run.schedule_tis(schedulable_tis)
Review Comment:
```suggestion
num = dag_run.schedule_tis(schedulable_tis, session=session)
```
##########
airflow/models/taskinstance.py:
##########
@@ -2459,6 +2459,65 @@ def ti_selector_condition(cls, vals: Collection[str |
tuple[str, int]]) -> Colum
return filters[0]
return or_(*filters)
+ @Sentry.enrich_errors
+ @provide_session
+ def schedule_downstream_tasks(self, session=None):
+ """
+ The mini-scheduler for scheduling downstream tasks of this task
instance
+ :meta: private
+ """
+ from sqlalchemy.exc import OperationalError
+
+ from airflow.models import DagRun
+
+ try:
+ # Re-select the row with a lock
+ dag_run = with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
+
+ task = self.task
+ if TYPE_CHECKING:
+ assert task.dag
+
+ # Get a partial DAG with just the specific tasks we want to
examine.
+ # In order for dep checks to work correctly, we include ourself (so
+ # TriggerRuleDep can check the state of the task we just executed).
+ partial_dag = task.dag.partial_subset(
+ task.downstream_task_ids,
+ include_downstream=True,
+ include_upstream=False,
+ include_direct_upstream=True,
+ )
+
+ dag_run.dag = partial_dag
+ info = dag_run.task_instance_scheduling_decisions(session)
+
+ skippable_task_ids = {
+ task_id for task_id in partial_dag.task_ids if task_id not in
task.downstream_task_ids
+ }
+
+ schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id
not in skippable_task_ids]
+ for schedulable_ti in schedulable_tis:
+ if not hasattr(schedulable_ti, "task"):
+ schedulable_ti.task =
task.dag.get_task(schedulable_ti.task_id)
+
+ num = dag_run.schedule_tis(schedulable_tis)
+ self.log.info("%d downstream tasks scheduled from follow-on
schedule check", num)
Review Comment:
Just for good measure
```suggestion
self.log.info("%d downstream tasks scheduled from follow-on
schedule check", num)
session.flush()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]