reswqa commented on code in PR #20371: URL: https://github.com/apache/flink/pull/20371#discussion_r930867604
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsBufferContext.java: ########## @@ -97,24 +97,40 @@ public Optional<CompletableFuture<Void>> getSpilledFuture() { return Optional.ofNullable(spilledFuture); } - public void release() { - checkState(!released, "Release buffer repeatedly is unexpected."); + /** + * Mark buffer status to release. + * + * @return Whether the status has been modified successfully. If it has been released, false + * will be returned. + */ + public boolean release() { + if (isReleased()) { + return false; + } released = true; // decrease ref count when buffer is released from memory. buffer.recycleBuffer(); + return true; } - public void startSpilling(CompletableFuture<Void> spilledFuture) { - checkState(!released, "Buffer is already released."); - checkState( - !spillStarted && this.spilledFuture == null, - "Spill buffer repeatedly is unexpected."); + /** + * Mark buffer status to startSpilling. + * + * @param spilledFuture completable future of this buffer's spilling operation. + * @return Whether the status has been modified successfully. If it has been released or is in + * startSpilling status, false will be returned. + */ + public boolean startSpilling(CompletableFuture<Void> spilledFuture) { + if (isReleased() || isSpillStarted()) { Review Comment: It is possible that thread `A` decides to release the buffer, and another thread `B` decides to start spilling before `A` mark this buffer to released status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org