[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618605#comment-16618605
 ] 

ASF GitHub Bot commented on FLINK-10331:
----------------------------------------

pnowojski commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218324609
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ##########
 @@ -179,10 +185,49 @@ public void 
testFlushWithUnfinishedBufferBehindFinished() throws Exception {
                try {
                        subpartition.add(createFilledBufferConsumer(1025)); // 
finished
                        
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+                       long oldNumNotifications = 
availablityListener.getNumNotifications();
                        subpartition.flush();
+                       // buffer queue is > 1, should already be notified, no 
further notification necessary
+                       assertThat(oldNumNotifications, greaterThan(0L));
+                       assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
 
                        assertNextBuffer(readView, 1025, true, 1, false, true);
                        assertNextBuffer(readView, 1024, false, 1, false, 
false);
+                       assertNoNextBuffer(readView);
+               } finally {
+                       subpartition.release();
+               }
+       }
+
+       /**
+        * A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+               final ResultSubpartition subpartition = createSubpartition();
 
 Review comment:
   Extract those initialisations to setup/teardown/`@Rule`. This code block is 
duplicated couple of times:
   
   ```
                final ResultSubpartition subpartition = createSubpartition();
                AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
                ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
                availablityListener.resetNotificationCounters();
   
                (...)
                } finally {
                                subpartition.release();
                }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---------------------------------------------------
>
>                 Key: FLINK-10331
>                 URL: https://issues.apache.org/jira/browse/FLINK-10331
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Major
>              Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to