Lee-W commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3413808035
##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -136,6 +295,177 @@ def __init__(self, exc: BaseException) -> None:
self.exc = exc
+class AckTimeout(Exception):
+ """
+ Raised in a subscriber that did not finish processing an event within the
per-event timeout.
+
+ The subscriber is either still on the event, or some of its derived
+ trigger events were never confirmed persisted. Treated the same as
+ :class:`_SubscriberOverflow` — the subscriber's trigger fails through
+ the standard trigger-failure path. Other subscribers in the same group
+ are unaffected; once they resolve, the producer advances normally.
+ """
+
+
+@dataclass(frozen=True, slots=True)
+class AdvanceOutcome:
+ """
+ Per-event resolution counts handed to :meth:`SharedStreamProducer.advance`.
+
+ Every subscriber that was online when the event was broadcast is counted
+ in exactly one field:
+
+ * ``acked`` — the subscriber moved past the event (pulled the next raw
+ event, or unsubscribed while the event was outstanding) and every
+ trigger event it derived from the event was confirmed persisted.
+ * ``failed`` — force-failed by the manager (ack timeout — including a
+ persist confirmation that never arrived — or queue overflow).
+ * ``rejected`` — the subscriber actively refused the event by calling
+ :func:`reject_shared_stream_event` from its filter. Terminal: the
+ broker should dead-letter / ``nack`` such events rather than redeliver
+ them, which is what distinguishes a reject from an involuntary
+ ``failed`` (where redelivery is the right response).
+
+ A producer reconciles these per-broker in
:meth:`SharedStreamProducer.advance`
+ — the framework only reports the counts. For example a Service Bus producer
+ dead-letters when ``rejected`` is non-zero, abandons (redelivers) when only
+ ``failed`` is non-zero, and completes when every subscriber accepted the
+ event; a Pub/Sub producer ``nack`` s on a reject and ``ack`` s otherwise.
+
+ An event broadcast while no subscribers were online carries all-zero
+ counts and is clean.
+ """
+
+ acked: int
+ failed: int
+ rejected: int = 0
+
+ @property
+ def is_clean(self) -> bool:
+ """Whether every subscriber accepted the event — no active reject and
no involuntary failure."""
+ return self.failed == 0 and self.rejected == 0
Review Comment:
`is_clean` now requires `acked > 0`, so a zero-subscriber broadcast
(all-zero counts) is no longer reported as clean
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]