This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f91bd77  [FLINK-24671][runtime] Return 0 buffers in use until 
subpartition view initialization in order to avoid NPE
f91bd77 is described below

commit f91bd772de866a48d65dfcb31d4ef0d1ef2c001e
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Thu Oct 28 12:15:48 2021 +0200

    [FLINK-24671][runtime] Return 0 buffers in use until subpartition view 
initialization in order to avoid NPE
---
 .../partition/consumer/LocalInputChannel.java      |  3 ++-
 .../partition/consumer/LocalInputChannelTest.java  | 25 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index ba1c075..120ffe9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -346,7 +346,8 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
     @Override
     int getBuffersInUseCount() {
-        return subpartitionView.getNumberOfQueuedBuffers();
+        ResultSubpartitionView view = this.subpartitionView;
+        return view == null ? 0 : view.getNumberOfQueuedBuffers();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f3f33b8..991ef74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -643,6 +643,31 @@ public class LocalInputChannelTest {
         assertEquals(20, 
subpartition1.add(createFilledFinishedBufferConsumer(16)));
     }
 
+    @Test
+    public void 
testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws 
Exception {
+        // given: Local input channel without initialized subpartition view.
+        ResultSubpartitionView subpartitionView =
+                createResultSubpartitionView(
+                        createFilledFinishedBufferConsumer(4096),
+                        createFilledFinishedBufferConsumer(4096),
+                        createFilledFinishedBufferConsumer(4096));
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(subpartitionView);
+        final SingleInputGate inputGate = createSingleInputGate(1);
+        final LocalInputChannel localChannel = 
createLocalInputChannel(inputGate, partitionManager);
+
+        inputGate.setInputChannels(localChannel);
+
+        // then: Buffers in use should be equal to 0 until subpartition view 
initialization.
+        assertEquals(0, localChannel.getBuffersInUseCount());
+
+        // when: The subpartition view is initialized.
+        localChannel.requestSubpartition(0);
+
+        // then: Buffers in use should show correct value.
+        assertEquals(3, localChannel.getBuffersInUseCount());
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     private static ResultSubpartitionView createResultSubpartitionView(boolean 
addBuffer)

Reply via email to