Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3413801193


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -80,19 +156,92 @@
 from __future__ import annotations
 
 import asyncio
-from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable
+import itertools
+import time
+from abc import ABC, abstractmethod
+from collections import deque
+from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable, 
Iterable, Iterator, Sequence
 from contextlib import suppress
-from typing import TYPE_CHECKING, Any
+from contextvars import ContextVar
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple
 
 import structlog
 
+from airflow.triggers.base import BaseEventTrigger
+
 if TYPE_CHECKING:
     from structlog.stdlib import BoundLogger
 
-    from airflow.triggers.base import BaseEventTrigger
-
 log = structlog.get_logger(__name__)
 
+__all__ = [
+    "AckTimeout",
+    "AdvanceItem",
+    "AdvanceOutcome",
+    "SharedStreamManager",
+    "SharedStreamProducer",
+    "reject_shared_stream_event",
+]
+
+
+class _RejectContext(NamedTuple):
+    """
+    The open binding window in scope while a subscriber's filter runs in ack 
mode.
+
+    Set by :meth:`_SharedStreamGroup._ack_drain` for the duration of one raw
+    event's yield and read by :func:`reject_shared_stream_event`.
+    """
+
+    group: _SharedStreamGroup
+    trigger_id: int
+    event_id: int
+
+
+_reject_context: ContextVar[_RejectContext | None] = 
ContextVar("shared_stream_reject_context", default=None)
+"""Task-local handle to the binding window of the raw event currently being 
filtered.
+
+Only set while a subscriber's 
:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream`
+is processing a raw event in ack mode; ``None`` otherwise (fast path, 
standalone
+``run()``, or between raw events). See :func:`reject_shared_stream_event`.
+"""
+
+
+def reject_shared_stream_event() -> None:
+    """
+    Reject the shared-stream raw event the calling filter is currently 
processing.
+
+    Call this from inside
+    :meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` when,
+    instead of yielding a :class:`~airflow.triggers.base.TriggerEvent`, you
+    want the broker to treat the raw event as terminally refused — dead-letter
+    it (Azure Service Bus), ``nack`` it (Pub/Sub), and so on. A reject is
+    distinct from an involuntary failure: it counts toward
+    :attr:`AdvanceOutcome.rejected` rather than ``failed``, so a producer can
+    dead-letter rejects while still redelivering failures.
+
+    The reject resolves the event immediately; there is no derived trigger
+    event to persist first.
+
+    Only meaningful while a raw event's binding window is open — that is,
+    inside ``filter_shared_stream`` of an ack-mode stream, right after the
+    filter received a raw event. Called anywhere else (the fast path,
+    standalone ``run()``, or between two raw events) it logs a warning and is a
+    no-op, because there is no broker advance to influence.
+
+    Invariant: this relies on the filter running in the same asyncio task as
+    the one driving the raw-event binding window (a task-local context variable
+    carries the open window). Driving the filter iteration from a different 
task
+    (for example via :func:`asyncio.to_thread` or a freshly created task) would
+    make the open window invisible here and turn every reject into a no-op.
+    """
+    ctx = _reject_context.get()
+    if ctx is None:
+        log.warning("reject_shared_stream_event called outside an ack-mode 
binding window; ignored")
+        return
+    ctx.group._reject_pending_event(trigger_id=ctx.trigger_id, 
event_id=ctx.event_id)
+

Review Comment:
   yep, added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to