Cai Liuyang created FLINK-25664:
-----------------------------------
Summary: Notify will be not triggered for PipelinedSubpartition if
more than one buffer is added during isBlocked == true
Key: FLINK-25664
URL: https://issues.apache.org/jira/browse/FLINK-25664
Project: Flink
Issue Type: Bug
Reporter: Cai Liuyang
For now, there might be case like:
# PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked ==
false)
# CreditBasedSequenceNumberingViewReader pool this buffer and
PipelinedSubPartition become to Blocked (isBlocked == true)
# Before downStream resumeConsumption, we add two finished-buffer to this
PipelinedSubPartition (there is no limit for adding buffer to
blocked-PipelinedSubPartition)
## add the first finished-buffer will not notifyDataAvailable because
isBlocked == true
## add the second finished-buffer will also not notifyDataAvailable because of
isBlocked == true and finishedBuffer > 1
# DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked
== false)
# OutputFlusher call PipelinedSubPartition will not notifyDataAvailable
because of finishedBuffer > 1
In conclusion,There are three case we should trigger notifyDataAvailable:
case1: only have one finished buffer (handled by add)
case2: only have one unfinished buffer (handled by flush)
case3: have more than on finished buffer, which is add during
PipelinedSubPartition is blocked (not handled)
{code:java}
// test code for this case
// add this test case to
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest
will
@Test
public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
throws Exception {
blockSubpartitionByCheckpoint(1);
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
assertEquals(1, availablityListener.getNumNotifications());
readView.resumeConsumption();
subpartition.flush();
assertEquals(2, availablityListener.getNumNotifications());
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)