This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new d1a3ad0 [FLINK-24738][runtime] Ignoring buffer size announcement if the channel is released already d1a3ad0 is described below commit d1a3ad01a96d2d565c5311e4d786917d0c71e8af Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Wed Nov 3 16:13:42 2021 +0100 [FLINK-24738][runtime] Ignoring buffer size announcement if the channel is released already --- .../partition/consumer/LocalInputChannel.java | 6 ++- .../partition/consumer/RemoteInputChannel.java | 1 + .../partition/consumer/SingleInputGate.java | 4 +- .../network/partition/InputChannelTestUtils.java | 28 +++++++++++ .../partition/consumer/LocalInputChannelTest.java | 57 +++++++++++----------- .../partition/consumer/RemoteInputChannelTest.java | 9 ++++ .../partition/consumer/SingleInputGateTest.java | 33 +++++++++++++ .../tasks/bufferdebloat/BufferDebloater.java | 4 +- .../tasks/bufferdebloat/BufferDebloaterTest.java | 2 +- 9 files changed, 111 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 120ffe9..13c6538 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -340,8 +340,10 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit void announceBufferSize(int newBufferSize) { checkState(!isReleased, "Channel released."); - ResultSubpartitionView subpartitionView = checkNotNull(this.subpartitionView); - subpartitionView.notifyNewBufferSize(newBufferSize); + ResultSubpartitionView view = this.subpartitionView; + if (view != null) { + view.notifyNewBufferSize(newBufferSize); + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 07de42c..90a5da2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -324,6 +324,7 @@ public class RemoteInputChannel extends InputChannel { } private void notifyNewBufferSize(int newBufferSize) throws IOException { + checkState(!isReleased.get(), "Channel released."); checkPartitionRequestQueueInitialized(); partitionRequestClient.notifyNewBufferSize(this, newBufferSize); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 7652c20..c4584da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -386,7 +386,9 @@ public class SingleInputGate extends IndexedInputGate { @Override public void announceBufferSize(int newBufferSize) { for (InputChannel channel : channels) { - channel.announceBufferSize(newBufferSize); + if (!channel.isReleased()) { + channel.announceBufferSize(newBufferSize); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index c23f215..8abcdd1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -21,9 +21,11 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; @@ -36,10 +38,12 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.function.Consumer; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; @@ -215,6 +219,30 @@ public class InputChannelTestUtils { /** This class is not meant to be instantiated. */ private InputChannelTestUtils() {} + public static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) + throws IOException { + return addBuffer + ? createResultSubpartitionView(createFilledFinishedBufferConsumer(4096)) + : createResultSubpartitionView(); + } + + public static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers) + throws IOException { + int bufferSize = 4096; + PipelinedResultPartition parent = + (PipelinedResultPartition) + PartitionTestUtils.createPartition( + ResultPartitionType.PIPELINED, + NoOpFileChannelManager.INSTANCE, + true, + bufferSize); + ResultSubpartition subpartition = parent.getAllPartitions()[0]; + for (BufferConsumer buffer : buffers) { + subpartition.add(buffer); + } + return subpartition.createReadView(() -> {}); + } + /** Test stub for {@link MemorySegmentProvider}. */ public static class StubMemorySegmentProvider implements MemorySegmentProvider { private static final MemorySegmentProvider INSTANCE = new StubMemorySegmentProvider(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 991ef74..46c005e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition; @@ -71,6 +72,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel; @@ -118,7 +120,8 @@ public class LocalInputChannelTest { InputChannelBuilder.newBuilder() .setPartitionManager( new TestingResultPartitionManager( - createResultSubpartitionView(barrierHolder, data))) + InputChannelTestUtils.createResultSubpartitionView( + barrierHolder, data))) .setStateWriter(stateWriter) .buildLocalChannel(new SingleInputGateBuilder().build()); channel.requestSubpartition(0); @@ -474,7 +477,8 @@ public class LocalInputChannelTest { */ @Test public void testGetNextAfterPartitionReleased() throws Exception { - ResultSubpartitionView subpartitionView = createResultSubpartitionView(false); + ResultSubpartitionView subpartitionView = + InputChannelTestUtils.createResultSubpartitionView(false); TestingResultPartitionManager partitionManager = new TestingResultPartitionManager(subpartitionView); LocalInputChannel channel = @@ -499,7 +503,8 @@ public class LocalInputChannelTest { /** Verifies that buffer is not compressed when getting from a {@link LocalInputChannel}. */ @Test public void testGetBufferFromLocalChannelWhenCompressionEnabled() throws Exception { - ResultSubpartitionView subpartitionView = createResultSubpartitionView(true); + ResultSubpartitionView subpartitionView = + InputChannelTestUtils.createResultSubpartitionView(true); TestingResultPartitionManager partitionManager = new TestingResultPartitionManager(subpartitionView); LocalInputChannel channel = @@ -523,6 +528,26 @@ public class LocalInputChannelTest { localChannel.resumeConsumption(); } + @Test(expected = IllegalStateException.class) + public void testAnnounceBufferSize() throws Exception { + // given: Initialized local input channel. + AtomicInteger lastBufferSize = new AtomicInteger(0); + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager( + InputChannelTestUtils.createResultSubpartitionView(true)); + SingleInputGate inputGate = createSingleInputGate(1); + LocalInputChannel localChannel = createLocalInputChannel(inputGate, partitionManager); + localChannel.requestSubpartition(0); + + localChannel.announceBufferSize(10); + + // when: Release all resources. + localChannel.releaseAllResources(); + + // then: Announcement buffer size should lead to exception. + localChannel.announceBufferSize(12); + } + @Test public void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException { PipelinedResultPartition parent = @@ -647,7 +672,7 @@ public class LocalInputChannelTest { public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception { // given: Local input channel without initialized subpartition view. ResultSubpartitionView subpartitionView = - createResultSubpartitionView( + InputChannelTestUtils.createResultSubpartitionView( createFilledFinishedBufferConsumer(4096), createFilledFinishedBufferConsumer(4096), createFilledFinishedBufferConsumer(4096)); @@ -670,30 +695,6 @@ public class LocalInputChannelTest { // --------------------------------------------------------------------------------------------- - private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) - throws IOException { - return addBuffer - ? createResultSubpartitionView(createFilledFinishedBufferConsumer(4096)) - : createResultSubpartitionView(); - } - - private static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers) - throws IOException { - int bufferSize = 4096; - PipelinedResultPartition parent = - (PipelinedResultPartition) - PartitionTestUtils.createPartition( - ResultPartitionType.PIPELINED, - NoOpFileChannelManager.INSTANCE, - true, - bufferSize); - ResultSubpartition subpartition = parent.getAllPartitions()[0]; - for (BufferConsumer buffer : buffers) { - subpartition.add(buffer); - } - return subpartition.createReadView(() -> {}); - } - /** Returns the configured number of buffers for each channel in a random order. */ private static class TestPartitionProducerBufferSource implements TestProducerSource { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index a098f24..9c0418b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -1408,6 +1408,15 @@ public class RemoteInputChannelTest { remoteChannel.resumeConsumption(); } + @Test(expected = IllegalStateException.class) + public void testReleasedChannelAnnounceBufferSize() throws Exception { + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel remoteChannel = createRemoteInputChannel(inputGate); + + remoteChannel.releaseAllResources(); + remoteChannel.announceBufferSize(10); + } + @Test public void testOnUpstreamBlockedAndResumed() throws Exception { BufferPool bufferPool = new TestBufferPool(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index a14d499..09e3bc2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -78,6 +78,8 @@ import static java.util.Arrays.asList; import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultSubpartitionView; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; @@ -854,6 +856,37 @@ public class SingleInputGateTest extends InputGateTestBase { } @Test + public void testAnnounceBufferSize() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(2); + final LocalInputChannel localChannel = + createLocalInputChannel( + inputGate, + new TestingResultPartitionManager(createResultSubpartitionView())); + RemoteInputChannel remoteInputChannel = createRemoteInputChannel(inputGate, 1); + + inputGate.setInputChannels(localChannel, remoteInputChannel); + inputGate.requestPartitions(); + + inputGate.announceBufferSize(10); + + // Release all channels and gate one by one. + + localChannel.releaseAllResources(); + + inputGate.announceBufferSize(11); + + remoteInputChannel.releaseAllResources(); + + inputGate.announceBufferSize(12); + + inputGate.close(); + + inputGate.announceBufferSize(13); + + // No exceptions should happen. + } + + @Test public void testInputGateRemovalFromNettyShuffleEnvironment() throws Exception { NettyShuffleEnvironment network = createNettyShuffleEnvironment(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java index 5672db9..d1b3dab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java @@ -96,7 +96,9 @@ public class BufferDebloater { lastBufferSize = newSize; for (IndexedInputGate inputGate : inputGates) { - inputGate.announceBufferSize(newSize); + if (!inputGate.isFinished()) { + inputGate.announceBufferSize(newSize); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java index dc1dd2d..1fe7b53 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java @@ -218,7 +218,7 @@ public class BufferDebloaterTest extends TestLogger { public TestBufferSizeInputGate(int bufferInUseCount) { // Number of channels don't make sense here because - super(1, Collections.emptyList()); + super(1, Collections.emptyList(), false); this.bufferInUseCount = bufferInUseCount; }