FrankYang0529 commented on code in PR #67523:
URL: https://github.com/apache/airflow/pull/67523#discussion_r3413475688


##########
airflow-core/src/airflow/triggers/shared_stream.py:
##########
@@ -184,22 +553,165 @@ def start(self) -> None:
             name=f"shared-stream-poll[{self.key!r}]",
         )
 
+    def _request_pump_stop(self) -> None:
+        """
+        Ask the advance pump to drain the already-resolved lane heads and exit.
+
+        Synchronous (no await) so it can run inside ``_poll``'s terminal
+        section without yielding.
+        """
+        self._pump_stopping = True
+        self._advance_wakeup.set()
+
+    async def _run_advance_pump(self, producer: SharedStreamProducer) -> None:
+        """
+        Dispatch broker advances in per-lane fan-out order, one batch at a 
time.
+
+        A single task scans the lanes round-robin: each pass dispatches at
+        most one batch per lane — the lane's contiguous resolved prefix —
+        and passes repeat until one makes no progress. An advance that
+        raises is logged, its batch is abandoned to broker redelivery, and
+        the pump moves on. On stop the pump keeps passing until every
+        already-resolved dispatchable event is drained, abandons unresolved
+        entries to broker redelivery, and exits.
+        """
+        while True:
+            if not self._pump_stopping:
+                await self._advance_wakeup.wait()
+            self._advance_wakeup.clear()
+            progress = True
+            while progress:
+                progress = False
+                # Snapshot: lanes are added (by _poll) while we await below.
+                # A lane inserted during an await is invisible to this pass,
+                # but its appearance always comes with a wakeup.set() (born
+                # resolved) or a later resolve, so the next progress pass or
+                # the next wakeup picks it up — nothing is lost. A no-progress
+                # pass costs O(#lanes) and falls back to wait().
+                for lane in list(self._lane_queues):
+                    lane_queue = self._lane_queues[lane]
+                    # The deque head is always present in _outstanding: entry
+                    # deletion belongs to the pump alone, and it removes the
+                    # deque slot and the dict entry together below — a
+                    # KeyError here would be a bug. Harvest the lane's
+                    # contiguous resolved prefix synchronously, before the
+                    # await, so an advance that raises abandons the whole
+                    # batch to broker redelivery.
+                    batch: list[AdvanceItem] = []
+                    while lane_queue and not 
self._outstanding[lane_queue[0]].pending:
+                        head_id = lane_queue.popleft()
+                        entry = self._outstanding.pop(head_id)
+                        batch.append(
+                            AdvanceItem(
+                                entry.broker_payload,
+                                AdvanceOutcome(
+                                    acked=entry.acked, failed=entry.failed, 
rejected=entry.rejected
+                                ),
+                            )
+                        )
+                    if not lane_queue:
+                        # Sole lane GC point; _poll recreates the lane on 
demand.
+                        del self._lane_queues[lane]
+                    if not batch:
+                        continue
+                    try:
+                        await producer.advance(batch)
+                    except Exception as exc:
+                        self.log.error(
+                            "Producer advance raised; broker advance failed",
+                            key=self.key,
+                            lane=lane,
+                            batch_size=len(batch),
+                            exc_info=exc,
+                        )

Review Comment:
   > An advance that raises is logged, its batch is abandoned to broker 
redelivery, and the pump moves on.
   
   In some cases, the abandoned batch may not be redelivered.
   
   Use Kafka with one partition in a topic as example. In this case, the offset 
1 cannot be sent to dlq.
   ```
   batch 1: [0, 1, 2] <- The offset 1 is unclean and needs to be sent to dlq. 
However, the request is failed and raises an exception. 
   batch 2: [3, 4, 5] <- commit successfully
   ```
   
   Option 1: Let users choose whether to fail whole producer caused of single 
exception.
   Option 2: Define a retryable exception. Only ignore error if it's a 
retryable exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to