1996fanrui commented on code in PR #28107:
URL: https://github.com/apache/flink/pull/28107#discussion_r3197698705


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java:
##########
@@ -278,24 +273,56 @@ protected int peekNextBufferSubpartitionIdInternal() 
throws IOException {
 
     @Override
     public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
-        final SequenceBuffer next;
+        // Single critical section so "is recovery done", "is there a priority 
event", "what is
+        // the next data type" cannot be torn against the underlying queues. 
Splitting would let
+        // producers slip data into receivedBuffers (or drain the store) 
between segments and
+        // surface a stale moreAvailable that hides queued buffers from the 
gate.
+        final Buffer recoveredBuffer;
+        final SequenceBuffer fromReceivedBuffers;
         final DataType nextDataType;
 
-        synchronized (receivedBuffers) {
-            checkReadability();
-
-            next = receivedBuffers.poll();
-
-            if (next != null) {
-                totalQueueSizeInBytes -= next.buffer.getSize();
+        synchronized (recoveredStore) {
+            if (!recoveredStore.isEmpty()) {
+                if (hasPendingPriorityEvent) {
+                    fromReceivedBuffers = pollPendingPriorityEvent();
+                    if (fromReceivedBuffers == null) {
+                        // Invariant should keep the flag aligned with 
priority count; defensive
+                        // yield mirrors pre-refactor behavior.
+                        return Optional.empty();
+                    }
+                    nextDataType = peekNextDataType();
+                    recoveredBuffer = null;
+                } else {
+                    recoveredBuffer = recoveredStore.tryTake();

Review Comment:
   > I'm concerned about the complexity around consuming side (channels).
   
   I couldn't agree more. This is the **_main reason_** the PR is so complex, 
and it **_far exceeds_** my initial expectations.
   
   > Why can't we add buffers directly to channels, e.g. to 
RemoteInputChannel.receivedBuffers
   
   Currently, `receivedBuffers` only supports heap and network buffers, which 
prevents disk data from being handled.
   
   To resolve this, we have:
   
   - Enabled disk buffer snapshots on InputChannel during checkpointing.
   - Ensured all recovered buffers (network and disk) are fully consumed before 
processing(receiving) new upstream data.
   
   Implementation logic for this pr:
   
   - Introducing `recoveredStore` to maintain all filtered recovered 
buffers(network buffers and disk buffers)
   - Dispatcher will load buffer from disk to network buffer asynchronously 
(and wake up task thread to continue consume)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to