[ https://issues.apache.org/jira/browse/FLINK-14118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935947#comment-16935947 ]
Yingjie Cao commented on FLINK-14118: ------------------------------------- [~pnowojski] You are right. High data skew can reproduce the problem. My test case uses 1000 output channel and 1ms flushing interval and writes most of the data to channel 0. The throughput results are as follows. Before the fix: Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 18240.197 ± 1892.419 ops/ms After the fix: Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units StreamNetworkThroughputDataSkewBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 24532.313 ± 1118.312 ops/ms Some other cases are also tested (all with 1ms flushing interval) and no evident performance difference is observed. The test results are as follows. Before the fix: Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 23032.384 ± 871.883 ops/ms KeyByBenchmarks.arrayKeyBy1MS thrpt 30 1923.863 ± 78.518 ops/ms KeyByBenchmarks.tupleKeyBy1MS thrpt 30 3377.401 ± 216.982 ops/ms InputBenchmark.mapRebalanceMapSink1MS thrpt 30 6091.213 ± 92.658 ops/ms InputBenchmark.mapSinkBufferTimeout1MS thrpt 30 9107.194 ± 211.169 ops/ms After the fix: Benchmark (channelsFlushTimeout) Mode Cnt Score Error Units StreamNetworkThroughputBenchmarkExecutor.networkThroughput 1000,1ms thrpt 30 23985.588 ± 990.037 ops/ms KeyByBenchmarks.arrayKeyBy1MS thrpt 30 2011.356 ± 40.347 ops/ms KeyByBenchmarks.tupleKeyBy1MS thrpt 30 3440.238 ± 211.906 ops/ms InputBenchmark.mapRebalanceMapSink1MS thrpt 30 6118.888 ± 94.517 ops/ms InputBenchmark.mapSinkBufferTimeout1MS thrpt 30 9120.144 ± 252.023 ops/ms The extra synchronization point dose not introduce any regression to the above test cases. I guess the reason is that the synchronization point sits in the synchronization block which also need a memory barrier. Moving the output flushing logic to the mailbox is a good choice, though just like what you have mentioned, the main concern is how to efficiently implement the "flushAll" mailbox action. I wonder if the mailbox facility will be introduced to version 1.8 and 1.9. If not, I would suggest to pick the fix to version 1.8 and 1.9. > Reduce the unnecessary flushing when there is no data available for flush > ------------------------------------------------------------------------- > > Key: FLINK-14118 > URL: https://issues.apache.org/jira/browse/FLINK-14118 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network > Reporter: Yingjie Cao > Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0, 1.9.1, 1.8.3 > > Time Spent: 10m > Remaining Estimate: 0h > > The new flush implementation which works by triggering a netty user event may > cause performance regression compared to the old synchronization-based one. > More specifically, when there is exactly one BufferConsumer in the buffer > queue of subpartition and no new data will be added for a while in the future > (may because of just no input or the logic of the operator is to collect some > data for processing and will not emit records immediately), that is, there is > no data to send, the OutputFlusher will continuously notify data available > and wake up the netty thread, though no data will be returned by the > pollBuffer method. > For some of our production jobs, this will incur 20% to 40% CPU overhead > compared to the old implementation. We tried to fix the problem by checking > if there is new data available when flushing, if there is no new data, the > netty thread will not be notified. It works for our jobs and the cpu usage > falls to previous level. -- This message was sent by Atlassian Jira (v8.3.4#803005)