This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8319bf4  [FLINK-23223] Notifies if there are available data on 
resumption for pipelined subpartition
8319bf4 is described below

commit 8319bf44b1561a4b69851b105fd379dec161e675
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Sun Jul 4 22:17:42 2021 +0800

    [FLINK-23223] Notifies if there are available data on resumption for 
pipelined subpartition
---
 .../network/partition/PipelinedSubpartition.java   |  9 ++---
 .../PipelinedSubpartitionWithReadViewTest.java     | 47 ++++++++++++++++++----
 .../partition/consumer/LocalInputChannelTest.java  | 37 +++++++++++++++++
 3 files changed, 81 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index e13f413..134ef68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -463,11 +463,10 @@ public class PipelinedSubpartition extends 
ResultSubpartition
             }
             // if there is more then 1 buffer, we already notified the reader
             // (at the latest when adding the second buffer)
-            notifyDataAvailable =
-                    !isBlocked
-                            && buffers.size() == 1
-                            && 
buffers.peek().getBufferConsumer().isDataAvailable();
-            flushRequested = buffers.size() > 1 || notifyDataAvailable;
+            boolean isDataAvailableInUnfinishedBuffer =
+                    buffers.size() == 1 && 
buffers.peek().getBufferConsumer().isDataAvailable();
+            notifyDataAvailable = !isBlocked && 
isDataAvailableInUnfinishedBuffer;
+            flushRequested = buffers.size() > 1 || 
isDataAvailableInUnfinishedBuffer;
         }
         if (notifyDataAvailable) {
             notifyDataAvailable();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 972455b..de1b32b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -487,8 +487,7 @@ public class PipelinedSubpartitionWithReadViewTest {
     }
 
     @Test
-    public void testBlockedByCheckpointAndResumeConsumption()
-            throws IOException, InterruptedException {
+    public void testResumeBlockedSubpartitionWithEvents() throws IOException, 
InterruptedException {
         blockSubpartitionByCheckpoint(1);
 
         // add an event after subpartition blocked
@@ -496,33 +495,67 @@ public class PipelinedSubpartitionWithReadViewTest {
         // no data available notification after adding an event
         checkNumNotificationsAndAvailability(1);
 
+        // Resumption will make the subpartition available.
         resumeConsumptionAndCheckAvailability(0, true);
         assertNextEvent(readView, BUFFER_SIZE, null, false, 0, false, true);
+    }
 
-        blockSubpartitionByCheckpoint(2);
+    @Test
+    public void testResumeBlockedSubpartitionWithUnfinishedBufferFlushed()
+            throws IOException, InterruptedException {
+        blockSubpartitionByCheckpoint(1);
 
         // add a buffer and flush the subpartition
         subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
         subpartition.flush();
         // no data available notification after adding a buffer and flushing 
the subpartition
-        checkNumNotificationsAndAvailability(2);
+        checkNumNotificationsAndAvailability(1);
 
-        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+        // Resumption will make the subpartition available.
+        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
         assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
+    }
+
+    @Test
+    public void testResumeBlockedSubpartitionWithUnfinishedBufferNotFlushed()
+            throws IOException, InterruptedException {
+        blockSubpartitionByCheckpoint(1);
+
+        // add a buffer but not flush the subpartition.
+        subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
+        // no data available notification after adding a buffer.
+        checkNumNotificationsAndAvailability(1);
+
+        // Resumption will not make the subpartition available since the data 
is not flushed before.
+        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+    }
 
-        blockSubpartitionByCheckpoint(3);
+    @Test
+    public void testResumeBlockedSubpartitionWithFinishedBuffers()
+            throws IOException, InterruptedException {
+        blockSubpartitionByCheckpoint(1);
 
         // add two buffers to the subpartition
         subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
         subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
         // no data available notification after adding the second buffer
-        checkNumNotificationsAndAvailability(3);
+        checkNumNotificationsAndAvailability(1);
 
+        // Resumption will make the subpartition available.
         resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
         assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
         assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
     }
 
+    @Test
+    public void testResumeBlockedEmptySubpartition() throws IOException, 
InterruptedException {
+        blockSubpartitionByCheckpoint(1);
+
+        // Resumption will not make the subpartition available since it is 
empty.
+        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
+        assertNoNextBuffer(readView);
+    }
+
     // ------------------------------------------------------------------------
 
     private void blockSubpartitionByCheckpoint(int numNotifications)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a6c2c57..a48a645 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -523,6 +523,43 @@ public class LocalInputChannelTest {
     }
 
     @Test
+    public void testEnqueueAvailableChannelWhenResuming() throws IOException, 
InterruptedException {
+        PipelinedResultPartition parent =
+                (PipelinedResultPartition)
+                        PartitionTestUtils.createPartition(
+                                ResultPartitionType.PIPELINED, 
NoOpFileChannelManager.INSTANCE);
+        ResultSubpartition subpartition = parent.getAllPartitions()[0];
+        ResultSubpartitionView subpartitionView = 
subpartition.createReadView(() -> {});
+
+        TestingResultPartitionManager partitionManager =
+                new TestingResultPartitionManager(subpartitionView);
+        LocalInputChannel channel =
+                createLocalInputChannel(new SingleInputGateBuilder().build(), 
partitionManager);
+        channel.requestSubpartition(0);
+
+        // Block the subpartition
+        subpartition.add(
+                EventSerializer.toBufferConsumer(
+                        new CheckpointBarrier(
+                                1, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                        false));
+        assertTrue(channel.getNextBuffer().isPresent());
+
+        // Add more data
+        
subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
+        subpartition.flush();
+
+        // No buffer since the subpartition is blocked.
+        assertFalse(channel.inputGate.pollNext().isPresent());
+
+        // Resumption makes the subpartition available.
+        channel.resumeConsumption();
+        Optional<BufferOrEvent> nextBuffer = channel.inputGate.pollNext();
+        assertTrue(nextBuffer.isPresent());
+        assertTrue(nextBuffer.get().isBuffer());
+    }
+
+    @Test
     public void testCheckpointingInflightData() throws Exception {
         SingleInputGate inputGate = new SingleInputGateBuilder().build();
 

Reply via email to