kaxil commented on code in PR #62922:
URL: https://github.com/apache/airflow/pull/62922#discussion_r2956613798
##########
task-sdk/src/airflow/sdk/definitions/_internal/expandinput.py:
##########
@@ -79,6 +78,45 @@ def _needs_run_time_resolution(v: OperatorExpandArgument) ->
TypeGuard[MappedArg
return isinstance(v, (MappedArgument, XComArg))
+class ExpandInput(ABC, ResolveMixin):
Review Comment:
Changing `ExpandInput` from a `Union` type alias to an abstract class is a
semantic breaking change for any downstream code that relied on the union
(e.g., `isinstance(x, get_args(ExpandInput))` or type-narrowing). Since this is
in `_internal`, the blast radius is limited, but it's worth flagging —
especially for providers or third-party code that may have imported it.
##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -220,6 +220,13 @@ def event_loop() -> Generator[AbstractEventLoop]:
asyncio.set_event_loop(None)
+async def run_trigger(trigger: BaseTrigger) -> Any | None:
Review Comment:
`run_trigger` collects all events into a list but only returns the first
one. If a trigger yields multiple events, the rest are silently dropped. If
that's intentional, it would be cleaner to `break` after the first event and
avoid accumulating a list:
```python
async def run_trigger(trigger: BaseTrigger) -> Any | None:
async for event in trigger.run():
return event
return None
```
##########
task-sdk/tests/task_sdk/definitions/conftest.py:
##########
@@ -47,3 +49,10 @@ def run(dag: DAG, task_id: str, map_index: int):
raise RuntimeError("Unable to find call to TaskState")
return run
+
+
+def make_xcom_arg(values: Any) -> XComArg:
+ op = BaseOperator(task_id="upstream")
+ xcom_arg = XComArg(op)
+ patch.object(xcom_arg, "resolve", return_value=values).start()
Review Comment:
`patch.object(...).start()` without a matching `.stop()` or `addCleanup`
will leak the mock into subsequent tests. Consider using `monkeypatch`
(pytest-native) or storing the patcher and calling `.stop()` in a fixture with
proper teardown:
```python
def make_xcom_arg(values: Any, monkeypatch: pytest.MonkeyPatch) -> XComArg:
op = BaseOperator(task_id="upstream")
xcom_arg = XComArg(op)
monkeypatch.setattr(xcom_arg, "resolve", lambda *a, **kw: values)
return xcom_arg
```
##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1658,6 +1663,16 @@ def resume_execution(self, next_method: str,
next_kwargs: dict[str, Any] | None,
raise TaskDeferralError(error)
# Grab the callable off the Operator/Task and add in any kwargs
execute_callable = getattr(self, next_method)
+ if next_kwargs:
+ return partial(execute_callable, **next_kwargs)
+ return execute_callable
+
+ def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] |
None, context: Context):
+ """Entrypoint method called by the Task Runner (instead of execute)
when this task is resumed."""
+ if next_kwargs is None:
+ next_kwargs = {}
Review Comment:
Bug: `next_callable` already binds `**next_kwargs` via
`partial(execute_callable, **next_kwargs)` when kwargs are present, but
`resume_execution` on the next line still passes `**next_kwargs` again:
```python
return execute_callable(context, **next_kwargs)
```
When `next_kwargs` is non-empty, this calls `partial(fn, **kw)(context,
**kw)`, which raises `TypeError: got multiple values for keyword argument`.
The fix is likely:
```python
return execute_callable(context)
```
since `next_callable` already handles binding kwargs when they exist.
##########
task-sdk/src/airflow/sdk/execution_time/executor.py:
##########
@@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import contextvars
+import inspect
+import logging
+import time
+from asyncio import AbstractEventLoop, Semaphore
+from collections.abc import Callable
+from concurrent.futures import Future, ThreadPoolExecutor, as_completed
+from typing import TYPE_CHECKING, Any, cast
+
+from airflow.sdk import BaseAsyncOperator, BaseOperator, TaskInstanceState,
timezone
+from airflow.sdk.bases.operator import ExecutorSafeguard
+from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
+from airflow.sdk.exceptions import AirflowRescheduleTaskInstanceException,
TaskDeferred
+from airflow.sdk.execution_time.callback_runner import create_executable_runner
+from airflow.sdk.execution_time.context import context_get_outlet_events
+from airflow.sdk.execution_time.task_runner import (
+ RuntimeTaskInstance,
+ _execute_task,
+ _run_task_state_change_callbacks,
+)
+
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger as Logger
+
+ from airflow.sdk import Context
+ from airflow.sdk.execution_time.task_runner import MappedTaskInstance
+
+
+def collect_futures(loop: AbstractEventLoop, futures: list[Any]):
+ """Yield futures as they complete (sync or async)."""
+ yield from as_completed(f for f in futures if isinstance(f, Future))
+
+ async_tasks = [f for f in futures if isinstance(f, asyncio.Task)]
+
+ if async_tasks:
+ for task, _ in zip(
+ async_tasks,
+ loop.run_until_complete(asyncio.gather(*async_tasks,
return_exceptions=True)),
+ ):
+ yield task
+
+ return []
+
Review Comment:
`return []` in a generator function (one that uses `yield`) doesn't yield
anything — the list becomes `StopIteration.value`, which callers via
`for`/`list()` never see. This is dead code. Either remove it or, if the intent
was to return remaining items, convert the function to a non-generator that
returns a list.
##########
task-sdk/src/airflow/sdk/definitions/iterableoperator.py:
##########
@@ -0,0 +1,358 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import os
+from collections import deque
+from collections.abc import Iterable, Sequence
+from concurrent.futures import Future
+from itertools import chain
+from typing import TYPE_CHECKING, Any
+
+try:
+ # Python 3.12+
+ from itertools import batched # type: ignore[attr-defined]
+except ImportError:
+ from more_itertools import batched # type: ignore[no-redef]
+
+try:
+ # Python 3.11+
+ BaseExceptionGroup
+except NameError:
+ from exceptiongroup import BaseExceptionGroup
+
+from airflow.sdk import TaskInstanceState, timezone
+from airflow.sdk.bases.operator import BaseOperator,
DecoratedDeferredAsyncOperator, event_loop
+from airflow.sdk.definitions.xcom_arg import MapXComArg, XComArg # noqa: F401
+from airflow.sdk.exceptions import (
+ AirflowRescheduleTaskInstanceException,
+ AirflowTaskTimeout,
+ TaskDeferred,
+)
+from airflow.sdk.execution_time.context import context_to_airflow_vars
+from airflow.sdk.execution_time.executor import HybridExecutor, TaskExecutor,
collect_futures
+from airflow.sdk.execution_time.lazy_sequence import XComIterable
+from airflow.sdk.execution_time.task_runner import MappedTaskInstance
+
+if TYPE_CHECKING:
+ from airflow.sdk.definitions._internal.expandinput import ExpandInput
+ from airflow.sdk.definitions.context import Context
+ from airflow.sdk.definitions.mappedoperator import MappedOperator
+
+
+class IterableOperator(BaseOperator):
+ """Object representing an iterable operator in a DAG."""
+
+ _operator: MappedOperator
+ expand_input: ExpandInput
+ partial_kwargs: dict[str, Any]
+ # each operator should override this class attr for shallow copy attrs.
+ shallow_copy_attrs: Sequence[str] = (
+ "_operator",
+ "expand_input",
+ "partial_kwargs",
+ "_log",
+ )
+
+ def __init__(
+ self,
+ *,
+ operator: MappedOperator,
+ expand_input: ExpandInput,
+ task_concurrency: int | None = None,
+ **kwargs,
+ ):
+ super().__init__(
+ **{
+ **kwargs,
+ "task_id": operator.task_id,
+ "owner": operator.owner,
+ "email": operator.email,
+ "email_on_retry": operator.email_on_retry,
+ "email_on_failure": operator.email_on_failure,
+ "retries": 0, # We should not retry the IterableOperator,
only the mapped ti's should be retried
+ "retry_delay": operator.retry_delay,
+ "retry_exponential_backoff":
operator.retry_exponential_backoff,
+ "max_retry_delay": operator.max_retry_delay,
+ "start_date": operator.start_date,
+ "end_date": operator.end_date,
+ "depends_on_past": operator.depends_on_past,
+ "ignore_first_depends_on_past":
operator.ignore_first_depends_on_past,
+ "wait_for_past_depends_before_skipping":
operator.wait_for_past_depends_before_skipping,
+ "wait_for_downstream": operator.wait_for_downstream,
+ "dag": operator.dag,
+ "priority_weight": operator.priority_weight,
+ "queue": operator.queue,
+ "pool": operator.pool,
+ "pool_slots": operator.pool_slots,
+ "execution_timeout": None,
+ "trigger_rule": operator.trigger_rule,
+ "resources": operator.resources,
+ "run_as_user": operator.run_as_user,
+ "map_index_template": operator.map_index_template,
+ "max_active_tis_per_dag": operator.max_active_tis_per_dag,
+ "max_active_tis_per_dagrun":
operator.max_active_tis_per_dagrun,
+ "executor": operator.executor,
+ "executor_config": operator.executor_config,
+ "inlets": operator.inlets,
+ "outlets": operator.outlets,
+ "task_group": operator.task_group,
+ "doc": operator.doc,
+ "doc_md": operator.doc_md,
+ "doc_json": operator.doc_json,
+ "doc_yaml": operator.doc_yaml,
+ "doc_rst": operator.doc_rst,
+ "task_display_name": operator.task_display_name,
+ "allow_nested_operators": operator.allow_nested_operators,
+ }
+ )
+ self._operator = operator
+ self.expand_input = expand_input
+ self.partial_kwargs = operator.partial_kwargs or {}
+ self.max_workers = task_concurrency or os.cpu_count() or 1
+ self._number_of_tasks: int = 0
+ XComArg.apply_upstream_relationship(self, self.expand_input.value)
+
+ @property
+ def task_type(self) -> str:
+ """@property: type of the task."""
+ return self._operator.__class__.__name__
+
+ @property
+ def timeout(self) -> float | None:
+ if self.execution_timeout:
+ return self.execution_timeout.total_seconds()
+ return None
+
+ def _get_specified_expand_input(self) -> ExpandInput:
+ return self.expand_input
+
+ def _unmap_operator(self, mapped_kwargs: Context):
+ self._number_of_tasks += 1
+ return self._operator.unmap(mapped_kwargs)
+
+ def _xcom_push(self, context: Context, task: MappedTaskInstance, value:
Any) -> None:
+ self.log.debug("Pushing XCom %s", task.xcom_key)
+
+ context["ti"].xcom_push(key=task.xcom_key, value=value)
+
+ def _run_tasks(
+ self,
+ context: Context,
+ tasks: Iterable[MappedTaskInstance],
+ ) -> XComIterable | None:
+ exceptions: list[BaseException] = []
+ reschedule_date = timezone.utcnow()
+ prev_futures_count = 0
+ futures: dict[Future, MappedTaskInstance] = {}
+ deferred_tasks: deque[MappedTaskInstance] = deque()
+ failed_tasks: deque[MappedTaskInstance] = deque()
+ chunked_tasks = batched(tasks, self.max_workers)
+ do_xcom_push = True
+
+ self.log.info("Running tasks with %d workers", self.max_workers)
+
+ # Export context in os.environ to make it available for operators to
use.
+ airflow_context_vars = context_to_airflow_vars(context,
in_env_var_format=True)
+ os.environ.update(airflow_context_vars)
+
Review Comment:
`os.environ.update(airflow_context_vars)` mutates the global process
environment while child threads may be running concurrently (via
`HybridExecutor`/`ThreadPoolExecutor`). This is a data race — `os.environ` is
not thread-safe for concurrent reads and writes.
Consider passing context vars through the `context` dict or using
thread-local storage instead of mutating the shared environment.
##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1731,6 +1746,50 @@ def execute(self, context):
return loop.run_until_complete(self.aexecute(context))
+class DecoratedDeferredAsyncOperator(BaseAsyncOperator):
+ """
+ A decorator operator that wraps another deferred BaseOperator instance.
+
+ Implements the async aexecute() method while delegating all other behavior.
+ """
+
+ def __init__(self, *, operator: BaseOperator, task_deferred: TaskDeferred,
**kwargs: Any):
+ super().__init__(task_id=operator.task_id, **kwargs)
+ self._operator = operator
+ self._task_deferred = task_deferred
+
+ async def aexecute(self, context):
+ from airflow.sdk.execution_time.callback_runner import
create_executable_runner
+ from airflow.sdk.execution_time.context import
context_get_outlet_events
+
+ event = await run_trigger(self._task_deferred.trigger)
+
+ self.log.debug("event: %s", event)
+
+ if event:
+ self.log.debug("next_method: %s", self._task_deferred.method_name)
+
+ if self._task_deferred.method_name:
+ try:
+ next_method = self._operator.next_callable(
+ self._task_deferred.method_name,
+ self._task_deferred.kwargs,
+ )
+
+ outlet_events = context_get_outlet_events(context)
+ runner = create_executable_runner(
+ func=next_method,
+ outlet_events=outlet_events,
+ logger=self.log,
+ )
+ return runner.run(context, event.payload)
+ except TaskDeferred as task_deferred:
+ self._task_deferred = task_deferred
+ # Recursively handle nested deferrals
+ return await self.aexecute(context=context)
Review Comment:
This is unbounded recursion. If the callback keeps raising `TaskDeferred`,
Python will hit its recursion limit. A `while True` loop with a `break` would
be safer:
```python
while True:
event = await run_trigger(self._task_deferred.trigger)
if not event:
return None
if not self._task_deferred.method_name:
return None
try:
...
return runner.run(context, event.payload)
except TaskDeferred as td:
self._task_deferred = td
continue
```
##########
task-sdk/src/airflow/sdk/definitions/iterableoperator.py:
##########
@@ -0,0 +1,358 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import os
+from collections import deque
+from collections.abc import Iterable, Sequence
+from concurrent.futures import Future
+from itertools import chain
+from typing import TYPE_CHECKING, Any
+
+try:
+ # Python 3.12+
+ from itertools import batched # type: ignore[attr-defined]
+except ImportError:
+ from more_itertools import batched # type: ignore[no-redef]
+
+try:
+ # Python 3.11+
+ BaseExceptionGroup
+except NameError:
+ from exceptiongroup import BaseExceptionGroup
+
+from airflow.sdk import TaskInstanceState, timezone
+from airflow.sdk.bases.operator import BaseOperator,
DecoratedDeferredAsyncOperator, event_loop
+from airflow.sdk.definitions.xcom_arg import MapXComArg, XComArg # noqa: F401
+from airflow.sdk.exceptions import (
+ AirflowRescheduleTaskInstanceException,
+ AirflowTaskTimeout,
+ TaskDeferred,
+)
+from airflow.sdk.execution_time.context import context_to_airflow_vars
+from airflow.sdk.execution_time.executor import HybridExecutor, TaskExecutor,
collect_futures
+from airflow.sdk.execution_time.lazy_sequence import XComIterable
+from airflow.sdk.execution_time.task_runner import MappedTaskInstance
+
+if TYPE_CHECKING:
+ from airflow.sdk.definitions._internal.expandinput import ExpandInput
+ from airflow.sdk.definitions.context import Context
+ from airflow.sdk.definitions.mappedoperator import MappedOperator
+
+
+class IterableOperator(BaseOperator):
+ """Object representing an iterable operator in a DAG."""
+
+ _operator: MappedOperator
+ expand_input: ExpandInput
+ partial_kwargs: dict[str, Any]
+ # each operator should override this class attr for shallow copy attrs.
+ shallow_copy_attrs: Sequence[str] = (
+ "_operator",
+ "expand_input",
+ "partial_kwargs",
+ "_log",
+ )
+
+ def __init__(
+ self,
+ *,
+ operator: MappedOperator,
+ expand_input: ExpandInput,
+ task_concurrency: int | None = None,
+ **kwargs,
+ ):
+ super().__init__(
+ **{
+ **kwargs,
+ "task_id": operator.task_id,
+ "owner": operator.owner,
+ "email": operator.email,
+ "email_on_retry": operator.email_on_retry,
+ "email_on_failure": operator.email_on_failure,
+ "retries": 0, # We should not retry the IterableOperator,
only the mapped ti's should be retried
+ "retry_delay": operator.retry_delay,
+ "retry_exponential_backoff":
operator.retry_exponential_backoff,
+ "max_retry_delay": operator.max_retry_delay,
+ "start_date": operator.start_date,
+ "end_date": operator.end_date,
+ "depends_on_past": operator.depends_on_past,
+ "ignore_first_depends_on_past":
operator.ignore_first_depends_on_past,
+ "wait_for_past_depends_before_skipping":
operator.wait_for_past_depends_before_skipping,
+ "wait_for_downstream": operator.wait_for_downstream,
+ "dag": operator.dag,
+ "priority_weight": operator.priority_weight,
+ "queue": operator.queue,
+ "pool": operator.pool,
+ "pool_slots": operator.pool_slots,
+ "execution_timeout": None,
+ "trigger_rule": operator.trigger_rule,
+ "resources": operator.resources,
+ "run_as_user": operator.run_as_user,
+ "map_index_template": operator.map_index_template,
+ "max_active_tis_per_dag": operator.max_active_tis_per_dag,
+ "max_active_tis_per_dagrun":
operator.max_active_tis_per_dagrun,
+ "executor": operator.executor,
+ "executor_config": operator.executor_config,
+ "inlets": operator.inlets,
+ "outlets": operator.outlets,
+ "task_group": operator.task_group,
+ "doc": operator.doc,
+ "doc_md": operator.doc_md,
+ "doc_json": operator.doc_json,
+ "doc_yaml": operator.doc_yaml,
+ "doc_rst": operator.doc_rst,
+ "task_display_name": operator.task_display_name,
+ "allow_nested_operators": operator.allow_nested_operators,
+ }
+ )
+ self._operator = operator
+ self.expand_input = expand_input
+ self.partial_kwargs = operator.partial_kwargs or {}
+ self.max_workers = task_concurrency or os.cpu_count() or 1
+ self._number_of_tasks: int = 0
+ XComArg.apply_upstream_relationship(self, self.expand_input.value)
+
+ @property
+ def task_type(self) -> str:
+ """@property: type of the task."""
+ return self._operator.__class__.__name__
+
+ @property
+ def timeout(self) -> float | None:
+ if self.execution_timeout:
+ return self.execution_timeout.total_seconds()
+ return None
+
+ def _get_specified_expand_input(self) -> ExpandInput:
+ return self.expand_input
+
+ def _unmap_operator(self, mapped_kwargs: Context):
+ self._number_of_tasks += 1
+ return self._operator.unmap(mapped_kwargs)
+
+ def _xcom_push(self, context: Context, task: MappedTaskInstance, value:
Any) -> None:
+ self.log.debug("Pushing XCom %s", task.xcom_key)
+
+ context["ti"].xcom_push(key=task.xcom_key, value=value)
+
+ def _run_tasks(
+ self,
+ context: Context,
+ tasks: Iterable[MappedTaskInstance],
+ ) -> XComIterable | None:
+ exceptions: list[BaseException] = []
+ reschedule_date = timezone.utcnow()
+ prev_futures_count = 0
+ futures: dict[Future, MappedTaskInstance] = {}
+ deferred_tasks: deque[MappedTaskInstance] = deque()
+ failed_tasks: deque[MappedTaskInstance] = deque()
+ chunked_tasks = batched(tasks, self.max_workers)
+ do_xcom_push = True
+
+ self.log.info("Running tasks with %d workers", self.max_workers)
+
+ # Export context in os.environ to make it available for operators to
use.
+ airflow_context_vars = context_to_airflow_vars(context,
in_env_var_format=True)
+ os.environ.update(airflow_context_vars)
+
+ with event_loop() as loop:
+ with HybridExecutor(loop=loop, max_workers=self.max_workers) as
executor:
+ for task in next(chunked_tasks, []):
+ do_xcom_push = task.do_xcom_push
+ if task.is_async:
+ future = executor.submit(self._run_async_operator,
context, task)
+ else:
+ future = executor.submit(self._run_operator, context,
task)
+ futures[future] = task
+
+ while futures:
+ futures_count = len(futures)
+
+ if futures_count != prev_futures_count:
+ self.log.info("Number of remaining futures: %s",
futures_count)
+ prev_futures_count = futures_count
+
+ for future in collect_futures(loop, list(futures.keys())):
+ task = futures.pop(future)
+
+ try:
+ if isinstance(future, asyncio.futures.Future):
+ result = future.result()
+ else:
+ result = future.result(timeout=self.timeout)
+
+ self.log.debug("result: %s", result)
+
+ if result is not None and task.do_xcom_push:
+ self._xcom_push(
+ context=context,
+ task=task,
+ value=result,
+ )
+ except TaskDeferred as task_deferred:
+ operator = DecoratedDeferredAsyncOperator(
+ operator=task.task, task_deferred=task_deferred
+ )
+ # map_index is guaranteed to be int in
MappedTaskInstance due to validation in __init__
+ deferred_tasks.append(
+ self._create_mapped_task(
+ run_id=task.run_id,
+ map_index=task.map_index, # type:
ignore[arg-type]
+ try_number=task.try_number,
+ operator=operator,
+ )
+ )
+ except asyncio.TimeoutError as e:
+ self.log.warning("A timeout occurred for task_id
%s", task.task_id)
+ if task.next_try_number > (self.retries or 0):
+ exceptions.append(AirflowTaskTimeout(e))
+ else:
+ reschedule_date = min(reschedule_date,
task.next_retry_datetime())
+ failed_tasks.append(task)
+ except AirflowRescheduleTaskInstanceException as e:
+ reschedule_date = min(reschedule_date,
e.reschedule_date)
+ self.log.warning(
+ "An exception occurred for task_id %s with
map_index %s, it has been rescheduled at %s",
+ task.task_id,
+ task.map_index,
+ reschedule_date,
+ )
+ failed_tasks.append(e.task)
+ except Exception as e:
+ self.log.error(
+ "An exception occurred for task_id %s with
map_index %s",
+ task.task_id,
+ task.map_index,
+ )
+ exceptions.append(e)
+
+ if len(futures) < self.max_workers:
+ chunked_tasks = chain(deferred_tasks, chunked_tasks)
+ deferred_tasks.clear()
Review Comment:
Bug: `chain(deferred_tasks, chunked_tasks)` captures the deque by reference,
but `deferred_tasks.clear()` on the next line empties it immediately — before
the chained iterator is consumed. The deferred tasks are lost.
Repro:
```python
d = deque([1, 2, 3])
c = chain(d, iter([4, 5]))
d.clear()
list(c) # [4, 5] — items 1, 2, 3 are gone
```
Fix: snapshot the deque before clearing:
```python
chunked_tasks = chain(list(deferred_tasks), chunked_tasks)
deferred_tasks.clear()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]