jason810496 commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3248659978
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self):
Review Comment:
```suggestion
def shared_stream_key(self) -> Hashable:
```
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self):
+ # All triggers on the same directory + cadence share one scan.
+ return ("directory-scan", self.directory, self.poke_interval)
+
+ @classmethod
+ async def open_shared_stream(cls, kwargs):
Review Comment:
```suggestion
async def open_shared_stream(cls, kwargs: dict[str, Any]) ->
AsyncIterator[Any]:
```
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared underlying I/O between :class:`BaseEventTrigger` instances in the
triggerer.
+
+When multiple triggers declare the same non-``None``
+:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the
+triggerer routes them through :class:`SharedStreamManager` so that one
+underlying poll loop produces raw events that are broadcast to every
+participating trigger. Each trigger then runs
+:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to
+convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent`
+instances. Triggers that opt out (the default) keep their independent
+``run()``-based poll loops untouched.
+
+Lifecycle invariants
+--------------------
+
+The manager and groups cooperate to keep a single invariant true at every
+``await``-point:
+
+ A key is present in :attr:`SharedStreamManager._groups` only while its
+ group's poll task is alive and accepting new subscribers.
+
+This rules out the late-subscriber races that the naive design admits — a
+new subscriber for a key whose poll has died or is in the middle of being
+torn down always falls through to "create a fresh group" rather than
+attaching to a dead one and hanging on an empty queue. The invariant is
+maintained synchronously:
+
+* When ``_poll`` ends for any reason other than cancellation (the upstream
+ iterator raised, or returned), the group's ``finally`` block evicts the
+ key from ``_groups`` and broadcasts a terminal sentinel to current
+ subscribers — all without yielding, so no other coroutine can interleave.
+* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe`
+ evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a
+ new subscriber arriving while we wait for cancellation creates a fresh
+ group.
+* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous
+ step before awaiting any stop, applying the same rule to shutdown.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator, Callable, Hashable
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+import structlog
+
+if TYPE_CHECKING:
+ from structlog.stdlib import BoundLogger
+
+ from airflow.triggers.base import BaseEventTrigger
+
+log = structlog.get_logger(__name__)
+
+
+class _PollTerminated(Exception):
+ """
+ Raised inside subscribers when ``open_shared_stream`` returns without
yielding more events.
+
+ Implementations are expected to run for the lifetime of the group; an
+ early return would otherwise leave subscribers waiting forever on an
+ empty queue.
+ """
+
+
+class _PollFailure:
+ """Sentinel propagated through subscriber queues when the shared poll
ends."""
+
+ __slots__ = ("exc",)
+
+ def __init__(self, exc: BaseException) -> None:
+ self.exc = exc
+
+
+async def _drain(queue: asyncio.Queue) -> AsyncIterator[Any]:
+ """
+ Yield items from ``queue`` until a poll termination sentinel arrives.
+
+ Subscribers exit either by their consuming task being cancelled
+ (Airflow's standard idiom — :class:`CancelledError` propagates through
+ ``queue.get()``) or by the shared poll ending, in which case the
+ :class:`_PollFailure` sentinel re-raises here.
+ """
+ while True:
+ item = await queue.get()
+ if isinstance(item, _PollFailure):
+ raise item.exc
+ yield item
+
+
+class _SharedStreamGroup:
+ """One shared poll loop broadcasting raw events to N subscriber queues."""
+
+ def __init__(
+ self,
+ *,
+ key: Hashable,
+ trigger_class: type[BaseEventTrigger],
+ kwargs: dict[str, Any],
+ on_poll_terminate: Callable[[_SharedStreamGroup], None],
+ log: BoundLogger,
+ ) -> None:
+ self.key = key
+ self.trigger_class = trigger_class
+ self.kwargs = kwargs
+ self.log = log
+ self._on_poll_terminate = on_poll_terminate
+ self._subscribers: dict[int, asyncio.Queue] = {}
+ self._poll_task: asyncio.Task | None = None
+
+ def start(self) -> None:
+ """Start the underlying poll loop. Call exactly once per group."""
+ if self._poll_task is not None:
+ raise RuntimeError(f"Shared stream group {self.key!r} already
started")
+ self._poll_task = asyncio.create_task(
+ self._poll(),
+ name=f"shared-stream-poll[{self.key!r}]",
+ )
+
+ async def _poll(self) -> None:
+ terminal_exc: BaseException | None = None
+ try:
+ async for raw_event in
self.trigger_class.open_shared_stream(self.kwargs):
+ for queue in self._subscribers.values():
+ queue.put_nowait(raw_event)
+ terminal_exc = _PollTerminated(
+ f"open_shared_stream for {self.key!r} returned without
raising; "
+ "shared streams are expected to run for the lifetime of the
group"
+ )
+ except asyncio.CancelledError:
+ # ``stop()`` initiated this; the manager has already evicted the
+ # group and is awaiting our exit. Do not run the terminate path.
+ raise
+ except Exception as exc:
+ terminal_exc = exc
+ self.log.exception("Shared stream poll failed; propagating to
subscribers", key=self.key)
+ finally:
+ if terminal_exc is not None:
+ # Synchronous: evict from the manager and broadcast the
+ # sentinel before returning to the loop, so no coroutine can
+ # observe ``_groups[key]`` pointing at a dead poll.
+ self._on_poll_terminate(self)
+ failure = _PollFailure(terminal_exc)
+ for queue in self._subscribers.values():
+ queue.put_nowait(failure)
+
+ def subscribe(self, trigger_id: int) -> AsyncIterator[Any]:
+ """Register ``trigger_id`` as a subscriber and return its raw event
stream."""
+ if trigger_id in self._subscribers:
+ raise RuntimeError(f"Trigger {trigger_id} already subscribed to
shared stream {self.key!r}")
+ queue: asyncio.Queue = asyncio.Queue()
Review Comment:
Each subscriber gets an unbounded `asyncio.Queue` (no `maxsize`), and the
producer broadcasts with `put_nowait` at `_poll` above. If
`filter_shared_stream` is slow relative to the upstream cadence — e.g. a busy
Kafka topic or a high-frequency SQS poll — the slow subscriber's queue grows
without bound and we have a memory leak that's invisible until OOM.
##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly
works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some
common code.
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` —
return a key identifying the shared
+ upstream (typically a tuple of strings). Triggers whose key compares equal
will share one poll. Returning ``None``
+ (the default) opts out — the trigger runs its own independent ``run()``
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a
``@classmethod`` coroutine the triggerer
+ drives **once per shared-stream group** to yield raw events from the
upstream. Because the triggerer reuses one
+ trigger's kwargs to drive the shared poll, only rely on fields whose values
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an
instance method that consumes the
+ broadcast raw stream and yields the ``TriggerEvent`` instances this trigger
should fire. Per-trigger filtering
+ (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+ from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+ class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ def __init__(self, *, directory, filename, poke_interval=5.0):
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def shared_stream_key(self):
+ # All triggers on the same directory + cadence share one scan.
+ return ("directory-scan", self.directory, self.poke_interval)
+
+ @classmethod
+ async def open_shared_stream(cls, kwargs):
+ # Drives one directory listing loop per group.
+ ...
+
+ async def filter_shared_stream(self, shared_stream):
Review Comment:
```suggestion
async def filter_shared_stream(self, shared_stream:
AsyncIterator[Any]) -> AsyncIterator[TriggerEvent]:
```
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1527,6 +1556,9 @@ async def run_trigger(
# fine, the cleanup process will understand that, but we want
to
# allow triggers a chance to cleanup, either in that case or if
# they exit cleanly. Exception from cleanup methods are
ignored.
+ if shared_key is not None:
+ with suppress(Exception):
Review Comment:
`with suppress(Exception)` silently swallows any error from `unsubscribe`,
including ones we'd want to know about (e.g. an `asyncio.CancelledError`
propagation bug, or a programming error in `_handle_poll_terminate`). The
cleanup is best-effort, but a bare suppress with no log makes those bugs
invisible.
Suggest:
```python
if shared_key is not None:
try:
await self._shared_streams.unsubscribe(trigger_id, shared_key)
except Exception:
self.log.exception(
"Failed to unsubscribe trigger from shared stream",
trigger_id=trigger_id,
key=shared_key,
)
```
Same argument applies to the `trigger.cleanup()` suppress two lines down,
but that's pre-existing behavior — happy to leave it out of scope for this PR.
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared underlying I/O between :class:`BaseEventTrigger` instances in the
triggerer.
+
+When multiple triggers declare the same non-``None``
+:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the
+triggerer routes them through :class:`SharedStreamManager` so that one
+underlying poll loop produces raw events that are broadcast to every
+participating trigger. Each trigger then runs
+:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to
+convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent`
+instances. Triggers that opt out (the default) keep their independent
+``run()``-based poll loops untouched.
+
+Lifecycle invariants
+--------------------
+
+The manager and groups cooperate to keep a single invariant true at every
+``await``-point:
+
+ A key is present in :attr:`SharedStreamManager._groups` only while its
+ group's poll task is alive and accepting new subscribers.
+
+This rules out the late-subscriber races that the naive design admits — a
+new subscriber for a key whose poll has died or is in the middle of being
+torn down always falls through to "create a fresh group" rather than
+attaching to a dead one and hanging on an empty queue. The invariant is
+maintained synchronously:
+
+* When ``_poll`` ends for any reason other than cancellation (the upstream
+ iterator raised, or returned), the group's ``finally`` block evicts the
+ key from ``_groups`` and broadcasts a terminal sentinel to current
+ subscribers — all without yielding, so no other coroutine can interleave.
+* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe`
+ evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a
+ new subscriber arriving while we wait for cancellation creates a fresh
+ group.
+* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous
+ step before awaiting any stop, applying the same rule to shutdown.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncIterator, Callable, Hashable
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+import structlog
+
+if TYPE_CHECKING:
+ from structlog.stdlib import BoundLogger
+
+ from airflow.triggers.base import BaseEventTrigger
+
+log = structlog.get_logger(__name__)
+
+
+class _PollTerminated(Exception):
+ """
+ Raised inside subscribers when ``open_shared_stream`` returns without
yielding more events.
+
+ Implementations are expected to run for the lifetime of the group; an
+ early return would otherwise leave subscribers waiting forever on an
+ empty queue.
+ """
+
+
+class _PollFailure:
+ """Sentinel propagated through subscriber queues when the shared poll
ends."""
+
+ __slots__ = ("exc",)
+
+ def __init__(self, exc: BaseException) -> None:
+ self.exc = exc
+
+
+async def _drain(queue: asyncio.Queue) -> AsyncIterator[Any]:
+ """
+ Yield items from ``queue`` until a poll termination sentinel arrives.
+
+ Subscribers exit either by their consuming task being cancelled
+ (Airflow's standard idiom — :class:`CancelledError` propagates through
+ ``queue.get()``) or by the shared poll ending, in which case the
+ :class:`_PollFailure` sentinel re-raises here.
+ """
+ while True:
+ item = await queue.get()
+ if isinstance(item, _PollFailure):
+ raise item.exc
+ yield item
+
+
+class _SharedStreamGroup:
+ """One shared poll loop broadcasting raw events to N subscriber queues."""
+
+ def __init__(
+ self,
+ *,
+ key: Hashable,
+ trigger_class: type[BaseEventTrigger],
+ kwargs: dict[str, Any],
+ on_poll_terminate: Callable[[_SharedStreamGroup], None],
+ log: BoundLogger,
+ ) -> None:
+ self.key = key
+ self.trigger_class = trigger_class
+ self.kwargs = kwargs
+ self.log = log
+ self._on_poll_terminate = on_poll_terminate
+ self._subscribers: dict[int, asyncio.Queue] = {}
+ self._poll_task: asyncio.Task | None = None
+
+ def start(self) -> None:
+ """Start the underlying poll loop. Call exactly once per group."""
+ if self._poll_task is not None:
+ raise RuntimeError(f"Shared stream group {self.key!r} already
started")
+ self._poll_task = asyncio.create_task(
+ self._poll(),
+ name=f"shared-stream-poll[{self.key!r}]",
+ )
+
+ async def _poll(self) -> None:
+ terminal_exc: BaseException | None = None
+ try:
+ async for raw_event in
self.trigger_class.open_shared_stream(self.kwargs):
+ for queue in self._subscribers.values():
+ queue.put_nowait(raw_event)
+ terminal_exc = _PollTerminated(
+ f"open_shared_stream for {self.key!r} returned without
raising; "
+ "shared streams are expected to run for the lifetime of the
group"
+ )
+ except asyncio.CancelledError:
+ # ``stop()`` initiated this; the manager has already evicted the
+ # group and is awaiting our exit. Do not run the terminate path.
+ raise
+ except Exception as exc:
+ terminal_exc = exc
+ self.log.exception("Shared stream poll failed; propagating to
subscribers", key=self.key)
+ finally:
+ if terminal_exc is not None:
+ # Synchronous: evict from the manager and broadcast the
+ # sentinel before returning to the loop, so no coroutine can
+ # observe ``_groups[key]`` pointing at a dead poll.
+ self._on_poll_terminate(self)
+ failure = _PollFailure(terminal_exc)
+ for queue in self._subscribers.values():
+ queue.put_nowait(failure)
+
+ def subscribe(self, trigger_id: int) -> AsyncIterator[Any]:
+ """Register ``trigger_id`` as a subscriber and return its raw event
stream."""
+ if trigger_id in self._subscribers:
+ raise RuntimeError(f"Trigger {trigger_id} already subscribed to
shared stream {self.key!r}")
+ queue: asyncio.Queue = asyncio.Queue()
+ self._subscribers[trigger_id] = queue
+ return _drain(queue)
+
+ def unsubscribe(self, trigger_id: int) -> None:
+ # Active subscribers exit through their consuming task being cancelled
+ # (Airflow's standard idiom); dropping the queue is enough here.
+ self._subscribers.pop(trigger_id, None)
+
+ def is_empty(self) -> bool:
+ return not self._subscribers
+
+ async def stop(self) -> None:
+ """Cancel the poll task if it is still running and wait for it to
exit."""
+ if self._poll_task is None or self._poll_task.done():
+ return
+ self._poll_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._poll_task
+
+
+class SharedStreamManager:
Review Comment:
I came up with a design question: Should we introduce the callback channel?
Since in DirectoryFileDeleteTrigger, yes we can delete the file in each group
coroutine. However, take Kafka consumer as example, only the single consumer
instance itself can commit the offset.
Here's the discussion that I had with Claude:
---
Design question on the broader manager API: `filter_shared_stream` is a
one-way channel today — events flow producer → subscriber, with no path back.
For upstreams whose consumption requires a side-effect on a handle that only
the producer holds, this means the producer can't be told "the subscriber
accepted this; please advance/commit/ack".
Concrete example: `providers/apache/kafka/.../await_message.py` commits the
offset *only* after `apply_function` returns truthy (`if self.commit_offset:
await async_commit(...)`) — the commit is tied to the filter's accept/reject
decision, and the consumer handle lives in `open_shared_stream`. With this PR's
current shape there is no way to express that: filters can't tell the producer
they've consumed an event. `DirectoryFileDeleteTrigger` only works because
`unlink` doesn't need the shared scanner's handle.
A few shapes that could close this:
1. **Yield `(event, ack)` from `open_shared_stream`.** Filter calls the ack
closure on accept. Backwards-compatible with the plain-yield form. But
"first-ack commits" can lose at-least-once: subscriber A acks and dies, offset
advances past messages subscriber B hasn't seen.
2. **Producer waits for N acks per event before advancing.** Closes the
at-least-once gap but introduces real backpressure (a slow filter stalls the
upstream for the whole group), which interacts with the unbounded-queue concern
in the other thread.
3. **Watermark / commit-up-to-offset.** Aligns with real Kafka
consumer-group semantics but is a meaningfully different API surface.
My preference for *this* PR: scope the feature to **idempotent / read-only /
subscriber-side-effect** upstreams (directory scans, polling REST APIs, Kafka
with `enable.auto.commit=true`), state that restriction explicitly in this
class's docstring and in `BaseEventTrigger.open_shared_stream`, and open a
tracking issue for the producer-side ack channel — to be designed against a
concrete Kafka or SQS consumer rather than speculatively.
Without this, the feature reads as "share any upstream" but in practice only
works for a narrow slice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]