[ https://issues.apache.org/jira/browse/FLINK-25664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cai Liuyang closed FLINK-25664. ------------------------------- Resolution: Abandoned > Notify will be not triggered for PipelinedSubpartition if more than one > buffer is added during isBlocked == true > ---------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25664 > URL: https://issues.apache.org/jira/browse/FLINK-25664 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.14.3 > Reporter: Cai Liuyang > Priority: Critical > Labels: pull-request-available > > For now, there might be case like: > # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked > == false) > # CreditBasedSequenceNumberingViewReader pool this buffer and > PipelinedSubPartition become to Blocked (isBlocked == true) > # Before downStream resumeConsumption, we add two finished-buffer to this > PipelinedSubPartition (there is no limit for adding buffer to > blocked-PipelinedSubPartition) > ## add the first finished-buffer will not notifyDataAvailable because > isBlocked == true > ## add the second finished-buffer will also not notifyDataAvailable because > of isBlocked == true and finishedBuffer > 1 > # DownStream resumeConsumption, PipelinedSubPartition is unblocked > (isBlocked == false) > # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable > because of finishedBuffer > 1 > In conclusion,There are three case we should trigger notifyDataAvailable: > case1: only have one finished buffer (handled by add) > case2: only have one unfinished buffer (handled by flush) > case3: have more than on finished buffer, which is add during > PipelinedSubPartition is blocked (not handled) > {code:java} > // test code for this case > // add this test case to > org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest > > @Test > public void > testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() > throws Exception { > blockSubpartitionByCheckpoint(1); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); > assertEquals(1, availablityListener.getNumNotifications()); > readView.resumeConsumption(); > subpartition.flush(); > assertEquals(2, availablityListener.getNumNotifications()); > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)