[ 
https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935947#comment-16935947
 ] 

Yingjie Cao commented on FLINK-14118:
-------------------------------------

[~pnowojski] You are right. High data skew can reproduce the problem. My test 
case uses 1000 output channel and 1ms flushing interval and writes most of the 
data to channel 0. The throughput results are as follows.

Before the fix:

Benchmark                                                                       
              (channelsFlushTimeout) Mode  Cnt    Score           Error       
Units
StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms 
thrpt 30 18240.197 ± 1892.419 ops/ms

After the fix:

Benchmark                                                                       
                 (channelsFlushTimeout)    Mode  Cnt    Score            Error  
  Units StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput   
1000,1ms  thrpt   30  24532.313 ± 1118.312 ops/ms

Some other cases are also tested (all with 1ms flushing interval) and no 
evident performance difference is observed. The test results are as follows.

Before the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score        Error         
Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput          
1000,1ms  thrpt   30  23032.384 ± 871.883  ops/ms KeyByBenchmarks.arrayKeyBy1MS 
                                                                                
   thrpt   30    1923.863   ± 78.518   ops/ms KeyByBenchmarks.tupleKeyBy1MS     
                                                                               
thrpt   30    3377.401   ± 216.982  ops/ms 
InputBenchmark.mapRebalanceMapSink1MS                                           
                        thrpt   30    6091.213   ±  92.658   ops/ms 
InputBenchmark.mapSinkBufferTimeout1MS                                          
                          thrpt   30    9107.194    ± 211.169   ops/ms

After the fix:

Benchmark                                                                       
        (channelsFlushTimeout)   Mode  Cnt      Score             Error      
Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput          
1000,1ms  thrpt   30    23985.588 ± 990.037  ops/ms 
KeyByBenchmarks.arrayKeyBy1MS                                                   
                                 thrpt   30      2011.356   ± 40.347    ops/ms 
KeyByBenchmarks.tupleKeyBy1MS                                                   
                                 thrpt   30     3440.238   ± 211.906   ops/ms 
InputBenchmark.mapRebalanceMapSink1MS                                           
                        thrpt   30      6118.888    ±  94.517   ops/ms  
InputBenchmark.mapSinkBufferTimeout1MS                                          
                          thrpt   30       9120.144  ± 252.023   ops/ms

The extra synchronization point dose not introduce any regression to the above 
test cases. I guess the reason is that the synchronization point sits in the 
synchronization block which also need a memory barrier.

Moving the output flushing logic to the mailbox is a good choice, though just 
like what you have mentioned, the main concern is how to efficiently implement 
the "flushAll" mailbox action.

I wonder if the mailbox facility will be introduced to version 1.8 and 1.9. If 
not, I would suggest to pick the fix to version 1.8 and 1.9.

> Reduce the unnecessary flushing when there is no data available for flush
> -------------------------------------------------------------------------
>
>                 Key: FLINK-14118
>                 URL: https://issues.apache.org/jira/browse/FLINK-14118
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.10.0, 1.9.1, 1.8.3
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new flush implementation which works by triggering a netty user event may 
> cause performance regression compared to the old synchronization-based one. 
> More specifically, when there is exactly one BufferConsumer in the buffer 
> queue of subpartition and no new data will be added for a while in the future 
> (may because of just no input or the logic of the operator is to collect some 
> data for processing and will not emit records immediately), that is, there is 
> no data to send, the OutputFlusher will continuously notify data available 
> and wake up the netty thread, though no data will be returned by the 
> pollBuffer method.
> For some of our production jobs, this will incur 20% to 40% CPU overhead 
> compared to the old implementation. We tried to fix the problem by checking 
> if there is new data available when flushing, if there is no new data, the 
> netty thread will not be notified. It works for our jobs and the cpu usage 
> falls to previous level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to