xintongsong commented on code in PR #21415: URL: https://github.com/apache/flink/pull/21415#discussion_r1041747359
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java: ########## @@ -265,19 +265,24 @@ public int getBacklog() { private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex) throws Throwable { - if (loadedBuffers.isEmpty()) { - return Optional.empty(); - } - BufferIndexOrError peek = loadedBuffers.peek(); - - if (peek.getThrowable().isPresent()) { - throw peek.getThrowable().get(); - } else if (peek.getIndex() != expectedBufferIndex) { - return Optional.empty(); + while (peek != null) { + if (peek.getThrowable().isPresent()) { + throw peek.getThrowable().get(); + } else if (peek.getIndex() == expectedBufferIndex) { + break; + } else if (peek.getIndex() > expectedBufferIndex) { + return Optional.empty(); + } else if (peek.getIndex() < expectedBufferIndex) { + // Because the update of consumption progress may be delayed, there is a + // very small probability to load the buffer that has been consumed from memory. + // Skip these buffers directly to avoid repeated consumption. + loadedBuffers.poll(); + peek = loadedBuffers.peek(); + } } - return Optional.of(peek); + return peek == null ? Optional.empty() : Optional.of(peek); Review Comment: ```suggestion return Optional.ofNullable(peek); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org