This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8319bf4 [FLINK-23223] Notifies if there are available data on resumption for pipelined subpartition 8319bf4 is described below commit 8319bf44b1561a4b69851b105fd379dec161e675 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Sun Jul 4 22:17:42 2021 +0800 [FLINK-23223] Notifies if there are available data on resumption for pipelined subpartition --- .../network/partition/PipelinedSubpartition.java | 9 ++--- .../PipelinedSubpartitionWithReadViewTest.java | 47 ++++++++++++++++++---- .../partition/consumer/LocalInputChannelTest.java | 37 +++++++++++++++++ 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index e13f413..134ef68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -463,11 +463,10 @@ public class PipelinedSubpartition extends ResultSubpartition } // if there is more then 1 buffer, we already notified the reader // (at the latest when adding the second buffer) - notifyDataAvailable = - !isBlocked - && buffers.size() == 1 - && buffers.peek().getBufferConsumer().isDataAvailable(); - flushRequested = buffers.size() > 1 || notifyDataAvailable; + boolean isDataAvailableInUnfinishedBuffer = + buffers.size() == 1 && buffers.peek().getBufferConsumer().isDataAvailable(); + notifyDataAvailable = !isBlocked && isDataAvailableInUnfinishedBuffer; + flushRequested = buffers.size() > 1 || isDataAvailableInUnfinishedBuffer; } if (notifyDataAvailable) { notifyDataAvailable(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index 972455b..de1b32b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -487,8 +487,7 @@ public class PipelinedSubpartitionWithReadViewTest { } @Test - public void testBlockedByCheckpointAndResumeConsumption() - throws IOException, InterruptedException { + public void testResumeBlockedSubpartitionWithEvents() throws IOException, InterruptedException { blockSubpartitionByCheckpoint(1); // add an event after subpartition blocked @@ -496,33 +495,67 @@ public class PipelinedSubpartitionWithReadViewTest { // no data available notification after adding an event checkNumNotificationsAndAvailability(1); + // Resumption will make the subpartition available. resumeConsumptionAndCheckAvailability(0, true); assertNextEvent(readView, BUFFER_SIZE, null, false, 0, false, true); + } - blockSubpartitionByCheckpoint(2); + @Test + public void testResumeBlockedSubpartitionWithUnfinishedBufferFlushed() + throws IOException, InterruptedException { + blockSubpartitionByCheckpoint(1); // add a buffer and flush the subpartition subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.flush(); // no data available notification after adding a buffer and flushing the subpartition - checkNumNotificationsAndAvailability(2); + checkNumNotificationsAndAvailability(1); - resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false); + // Resumption will make the subpartition available. + resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true); assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true); + } + + @Test + public void testResumeBlockedSubpartitionWithUnfinishedBufferNotFlushed() + throws IOException, InterruptedException { + blockSubpartitionByCheckpoint(1); + + // add a buffer but not flush the subpartition. + subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); + // no data available notification after adding a buffer. + checkNumNotificationsAndAvailability(1); + + // Resumption will not make the subpartition available since the data is not flushed before. + resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false); + } - blockSubpartitionByCheckpoint(3); + @Test + public void testResumeBlockedSubpartitionWithFinishedBuffers() + throws IOException, InterruptedException { + blockSubpartitionByCheckpoint(1); // add two buffers to the subpartition subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); // no data available notification after adding the second buffer - checkNumNotificationsAndAvailability(3); + checkNumNotificationsAndAvailability(1); + // Resumption will make the subpartition available. resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true); assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true); assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true); } + @Test + public void testResumeBlockedEmptySubpartition() throws IOException, InterruptedException { + blockSubpartitionByCheckpoint(1); + + // Resumption will not make the subpartition available since it is empty. + resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false); + assertNoNextBuffer(readView); + } + // ------------------------------------------------------------------------ private void blockSubpartitionByCheckpoint(int numNotifications) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index a6c2c57..a48a645 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -523,6 +523,43 @@ public class LocalInputChannelTest { } @Test + public void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException { + PipelinedResultPartition parent = + (PipelinedResultPartition) + PartitionTestUtils.createPartition( + ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE); + ResultSubpartition subpartition = parent.getAllPartitions()[0]; + ResultSubpartitionView subpartitionView = subpartition.createReadView(() -> {}); + + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + LocalInputChannel channel = + createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager); + channel.requestSubpartition(0); + + // Block the subpartition + subpartition.add( + EventSerializer.toBufferConsumer( + new CheckpointBarrier( + 1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()), + false)); + assertTrue(channel.getNextBuffer().isPresent()); + + // Add more data + subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096)); + subpartition.flush(); + + // No buffer since the subpartition is blocked. + assertFalse(channel.inputGate.pollNext().isPresent()); + + // Resumption makes the subpartition available. + channel.resumeConsumption(); + Optional<BufferOrEvent> nextBuffer = channel.inputGate.pollNext(); + assertTrue(nextBuffer.isPresent()); + assertTrue(nextBuffer.get().isBuffer()); + } + + @Test public void testCheckpointingInflightData() throws Exception { SingleInputGate inputGate = new SingleInputGateBuilder().build();