xintongsong commented on code in PR #21415:
URL: https://github.com/apache/flink/pull/21415#discussion_r1041747359


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java:
##########
@@ -265,19 +265,24 @@ public int getBacklog() {
 
     private Optional<BufferIndexOrError> 
checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
             throws Throwable {
-        if (loadedBuffers.isEmpty()) {
-            return Optional.empty();
-        }
-
         BufferIndexOrError peek = loadedBuffers.peek();
-
-        if (peek.getThrowable().isPresent()) {
-            throw peek.getThrowable().get();
-        } else if (peek.getIndex() != expectedBufferIndex) {
-            return Optional.empty();
+        while (peek != null) {
+            if (peek.getThrowable().isPresent()) {
+                throw peek.getThrowable().get();
+            } else if (peek.getIndex() == expectedBufferIndex) {
+                break;
+            } else if (peek.getIndex() > expectedBufferIndex) {
+                return Optional.empty();
+            } else if (peek.getIndex() < expectedBufferIndex) {
+                // Because the update of consumption progress may be delayed, 
there is a
+                // very small probability to load the buffer that has been 
consumed from memory.
+                // Skip these buffers directly to avoid repeated consumption.
+                loadedBuffers.poll();
+                peek = loadedBuffers.peek();
+            }
         }
 
-        return Optional.of(peek);
+        return peek == null ? Optional.empty() : Optional.of(peek);

Review Comment:
   ```suggestion
           return Optional.ofNullable(peek);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to