This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d1a3ad0  [FLINK-24738][runtime] Ignoring buffer size announcement if 
the channel is released already
d1a3ad0 is described below

commit d1a3ad01a96d2d565c5311e4d786917d0c71e8af
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Wed Nov 3 16:13:42 2021 +0100

    [FLINK-24738][runtime] Ignoring buffer size announcement if the channel is 
released already
---
 .../partition/consumer/LocalInputChannel.java      |  6 ++-
 .../partition/consumer/RemoteInputChannel.java     |  1 +
 .../partition/consumer/SingleInputGate.java        |  4 +-
 .../network/partition/InputChannelTestUtils.java   | 28 +++++++++++
 .../partition/consumer/LocalInputChannelTest.java  | 57 +++++++++++-----------
 .../partition/consumer/RemoteInputChannelTest.java |  9 ++++
 .../partition/consumer/SingleInputGateTest.java    | 33 +++++++++++++
 .../tasks/bufferdebloat/BufferDebloater.java       |  4 +-
 .../tasks/bufferdebloat/BufferDebloaterTest.java   |  2 +-
 9 files changed, 111 insertions(+), 33 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 120ffe9..13c6538 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -340,8 +340,10 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
     void announceBufferSize(int newBufferSize) {
         checkState(!isReleased, "Channel released.");
 
-        ResultSubpartitionView subpartitionView = 
checkNotNull(this.subpartitionView);
-        subpartitionView.notifyNewBufferSize(newBufferSize);
+        ResultSubpartitionView view = this.subpartitionView;
+        if (view != null) {
+            view.notifyNewBufferSize(newBufferSize);
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 07de42c..90a5da2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -324,6 +324,7 @@ public class RemoteInputChannel extends InputChannel {
     }
 
     private void notifyNewBufferSize(int newBufferSize) throws IOException {
+        checkState(!isReleased.get(), "Channel released.");
         checkPartitionRequestQueueInitialized();
 
         partitionRequestClient.notifyNewBufferSize(this, newBufferSize);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 7652c20..c4584da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -386,7 +386,9 @@ public class SingleInputGate extends IndexedInputGate {
     @Override
     public void announceBufferSize(int newBufferSize) {
         for (InputChannel channel : channels) {
-            channel.announceBufferSize(newBufferSize);
+            if (!channel.isReleased()) {
+                channel.announceBufferSize(newBufferSize);
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index c23f215..8abcdd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -36,10 +38,12 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -215,6 +219,30 @@ public class InputChannelTestUtils {
     /** This class is not meant to be instantiated. */
     private InputChannelTestUtils() {}
 
+    public static ResultSubpartitionView createResultSubpartitionView(boolean 
addBuffer)
+            throws IOException {
+        return addBuffer
+                ? 
createResultSubpartitionView(createFilledFinishedBufferConsumer(4096))
+                : createResultSubpartitionView();
+    }
+
+    public static ResultSubpartitionView 
createResultSubpartitionView(BufferConsumer... buffers)
+            throws IOException {
+        int bufferSize = 4096;
+        PipelinedResultPartition parent =
+                (PipelinedResultPartition)
+                        PartitionTestUtils.createPartition(
+                                ResultPartitionType.PIPELINED,
+                                NoOpFileChannelManager.INSTANCE,
+                                true,
+                                bufferSize);
+        ResultSubpartition subpartition = parent.getAllPartitions()[0];
+        for (BufferConsumer buffer : buffers) {
+            subpartition.add(buffer);
+        }
+        return subpartition.createReadView(() -> {});
+    }
+
     /** Test stub for {@link MemorySegmentProvider}. */
     public static class StubMemorySegmentProvider implements 
MemorySegmentProvider {
         private static final MemorySegmentProvider INSTANCE = new 
StubMemorySegmentProvider();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 991ef74..46c005e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
@@ -71,6 +72,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
@@ -118,7 +120,8 @@ public class LocalInputChannelTest {
                 InputChannelBuilder.newBuilder()
                         .setPartitionManager(
                                 new TestingResultPartitionManager(
-                                        
createResultSubpartitionView(barrierHolder, data)))
+                                        
InputChannelTestUtils.createResultSubpartitionView(
+                                                barrierHolder, data)))
                         .setStateWriter(stateWriter)
                         .buildLocalChannel(new 
SingleInputGateBuilder().build());
         channel.requestSubpartition(0);
@@ -474,7 +477,8 @@ public class LocalInputChannelTest {
      */
     @Test
     public void testGetNextAfterPartitionReleased() throws Exception {
-        ResultSubpartitionView subpartitionView = 
createResultSubpartitionView(false);
+        ResultSubpartitionView subpartitionView =
+                InputChannelTestUtils.createResultSubpartitionView(false);
         TestingResultPartitionManager partitionManager =
                 new TestingResultPartitionManager(subpartitionView);
         LocalInputChannel channel =
@@ -499,7 +503,8 @@ public class LocalInputChannelTest {
     /** Verifies that buffer is not compressed when getting from a {@link 
LocalInputChannel}. */
     @Test
     public void testGetBufferFromLocalChannelWhenCompressionEnabled() throws 
Exception {
-        ResultSubpartitionView subpartitionView = 
createResultSubpartitionView(true);
+        ResultSubpartitionView subpartitionView =
+                InputChannelTestUtils.createResultSubpartitionView(true);
         TestingResultPartitionManager partitionManager =
                 new TestingResultPartitionManager(subpartitionView);
         LocalInputChannel channel =
@@ -523,6 +528,26 @@ public class LocalInputChannelTest {
         localChannel.resumeConsumption();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testAnnounceBufferSize() throws Exception {
+        // given: Initialized local input channel.
+        AtomicInteger lastBufferSize = new AtomicInteger(0);
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(
+                        
InputChannelTestUtils.createResultSubpartitionView(true));
+        SingleInputGate inputGate = createSingleInputGate(1);
+        LocalInputChannel localChannel = createLocalInputChannel(inputGate, 
partitionManager);
+        localChannel.requestSubpartition(0);
+
+        localChannel.announceBufferSize(10);
+
+        // when: Release all resources.
+        localChannel.releaseAllResources();
+
+        // then: Announcement buffer size should lead to exception.
+        localChannel.announceBufferSize(12);
+    }
+
     @Test
     public void testEnqueueAvailableChannelWhenResuming() throws IOException, 
InterruptedException {
         PipelinedResultPartition parent =
@@ -647,7 +672,7 @@ public class LocalInputChannelTest {
     public void 
testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws 
Exception {
         // given: Local input channel without initialized subpartition view.
         ResultSubpartitionView subpartitionView =
-                createResultSubpartitionView(
+                InputChannelTestUtils.createResultSubpartitionView(
                         createFilledFinishedBufferConsumer(4096),
                         createFilledFinishedBufferConsumer(4096),
                         createFilledFinishedBufferConsumer(4096));
@@ -670,30 +695,6 @@ public class LocalInputChannelTest {
 
     // 
---------------------------------------------------------------------------------------------
 
-    private static ResultSubpartitionView createResultSubpartitionView(boolean 
addBuffer)
-            throws IOException {
-        return addBuffer
-                ? 
createResultSubpartitionView(createFilledFinishedBufferConsumer(4096))
-                : createResultSubpartitionView();
-    }
-
-    private static ResultSubpartitionView 
createResultSubpartitionView(BufferConsumer... buffers)
-            throws IOException {
-        int bufferSize = 4096;
-        PipelinedResultPartition parent =
-                (PipelinedResultPartition)
-                        PartitionTestUtils.createPartition(
-                                ResultPartitionType.PIPELINED,
-                                NoOpFileChannelManager.INSTANCE,
-                                true,
-                                bufferSize);
-        ResultSubpartition subpartition = parent.getAllPartitions()[0];
-        for (BufferConsumer buffer : buffers) {
-            subpartition.add(buffer);
-        }
-        return subpartition.createReadView(() -> {});
-    }
-
     /** Returns the configured number of buffers for each channel in a random 
order. */
     private static class TestPartitionProducerBufferSource implements 
TestProducerSource {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index a098f24..9c0418b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1408,6 +1408,15 @@ public class RemoteInputChannelTest {
         remoteChannel.resumeConsumption();
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testReleasedChannelAnnounceBufferSize() throws Exception {
+        SingleInputGate inputGate = createSingleInputGate(1);
+        RemoteInputChannel remoteChannel = createRemoteInputChannel(inputGate);
+
+        remoteChannel.releaseAllResources();
+        remoteChannel.announceBufferSize(10);
+    }
+
     @Test
     public void testOnUpstreamBlockedAndResumed() throws Exception {
         BufferPool bufferPool = new TestBufferPool();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a14d499..09e3bc2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -78,6 +78,8 @@ import static java.util.Arrays.asList;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultSubpartitionView;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
@@ -854,6 +856,37 @@ public class SingleInputGateTest extends InputGateTestBase 
{
     }
 
     @Test
+    public void testAnnounceBufferSize() throws Exception {
+        final SingleInputGate inputGate = createSingleInputGate(2);
+        final LocalInputChannel localChannel =
+                createLocalInputChannel(
+                        inputGate,
+                        new 
TestingResultPartitionManager(createResultSubpartitionView()));
+        RemoteInputChannel remoteInputChannel = 
createRemoteInputChannel(inputGate, 1);
+
+        inputGate.setInputChannels(localChannel, remoteInputChannel);
+        inputGate.requestPartitions();
+
+        inputGate.announceBufferSize(10);
+
+        // Release all channels and gate one by one.
+
+        localChannel.releaseAllResources();
+
+        inputGate.announceBufferSize(11);
+
+        remoteInputChannel.releaseAllResources();
+
+        inputGate.announceBufferSize(12);
+
+        inputGate.close();
+
+        inputGate.announceBufferSize(13);
+
+        // No exceptions should happen.
+    }
+
+    @Test
     public void testInputGateRemovalFromNettyShuffleEnvironment() throws 
Exception {
         NettyShuffleEnvironment network = createNettyShuffleEnvironment();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
index 5672db9..d1b3dab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
@@ -96,7 +96,9 @@ public class BufferDebloater {
 
         lastBufferSize = newSize;
         for (IndexedInputGate inputGate : inputGates) {
-            inputGate.announceBufferSize(newSize);
+            if (!inputGate.isFinished()) {
+                inputGate.announceBufferSize(newSize);
+            }
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
index dc1dd2d..1fe7b53 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
@@ -218,7 +218,7 @@ public class BufferDebloaterTest extends TestLogger {
 
         public TestBufferSizeInputGate(int bufferInUseCount) {
             // Number of channels don't make sense here because
-            super(1, Collections.emptyList());
+            super(1, Collections.emptyList(), false);
             this.bufferInUseCount = bufferInUseCount;
         }
 

Reply via email to