This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 42f05f0cddd45b4ec54f1e7c52796e5e26cfbf6f Author: Weijie Guo <res...@163.com> AuthorDate: Thu Jul 28 17:19:36 2022 +0800 [hotfix] HsMemoryDataManager spillAsync's callback should assertNoException. --- .../io/network/partition/hybrid/HsMemoryDataManager.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index b58338b6912..0f0744fe9ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; @@ -222,13 +223,13 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData // decrease numUnSpillBuffers as this subpartition's buffer is spill. numUnSpillBuffers.getAndAdd(-bufferIndexAndChannels.size()); }); - - spiller.spillAsync(bufferWithIdentities) - .thenAccept( - spilledBuffers -> { - fileDataIndex.addBuffers(spilledBuffers); - spillingCompleteFuture.complete(null); - }); + FutureUtils.assertNoException( + spiller.spillAsync(bufferWithIdentities) + .thenAccept( + spilledBuffers -> { + fileDataIndex.addBuffers(spilledBuffers); + spillingCompleteFuture.complete(null); + })); } /**