This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f91bd77 [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE f91bd77 is described below commit f91bd772de866a48d65dfcb31d4ef0d1ef2c001e Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Thu Oct 28 12:15:48 2021 +0200 [FLINK-24671][runtime] Return 0 buffers in use until subpartition view initialization in order to avoid NPE --- .../partition/consumer/LocalInputChannel.java | 3 ++- .../partition/consumer/LocalInputChannelTest.java | 25 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index ba1c075..120ffe9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -346,7 +346,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit @Override int getBuffersInUseCount() { - return subpartitionView.getNumberOfQueuedBuffers(); + ResultSubpartitionView view = this.subpartitionView; + return view == null ? 0 : view.getNumberOfQueuedBuffers(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index f3f33b8..991ef74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -643,6 +643,31 @@ public class LocalInputChannelTest { assertEquals(20, subpartition1.add(createFilledFinishedBufferConsumer(16))); } + @Test + public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception { + // given: Local input channel without initialized subpartition view. + ResultSubpartitionView subpartitionView = + createResultSubpartitionView( + createFilledFinishedBufferConsumer(4096), + createFilledFinishedBufferConsumer(4096), + createFilledFinishedBufferConsumer(4096)); + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel(inputGate, partitionManager); + + inputGate.setInputChannels(localChannel); + + // then: Buffers in use should be equal to 0 until subpartition view initialization. + assertEquals(0, localChannel.getBuffersInUseCount()); + + // when: The subpartition view is initialized. + localChannel.requestSubpartition(0); + + // then: Buffers in use should show correct value. + assertEquals(3, localChannel.getBuffersInUseCount()); + } + // --------------------------------------------------------------------------------------------- private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer)