[ https://issues.apache.org/jira/browse/BEAM-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067704#comment-17067704 ]
Maximilian Michels commented on BEAM-9566: ------------------------------------------ Thank you [~Bingfeng Xia] for testing. The reason you are seeing worse performance is that BEAM-9573 fixed a bug and now iterates over all keys to find the minimum output timestamp. While some duplicate calculation calls have been removed, iterating over all keys still adds considerable more overhead, compared to the previously (incorrect!) solution to just iterate over all timers for the active key. Admittedly, I had not tested this with the RocksDB state backend. Indeed, the performance is just terrible. I've opened a PR for this issue to add a caching layer for the output timestamp. With the caching layer and the RocksDB state backend activated, I get the following numbers on my machine: {noformat} Performance: Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results (Baseline) 0000 2.5 39494.5 100000 0001 1.7 58858.2 92000 0002 0.5 189753.3 351 0003 1.8 55035.8 580 0004 1.9 5152.0 40 0005 8.4 11953.1 12 0006 1.3 7507.5 401 0007 3.8 26089.2 1 0008 1.3 76452.6 6000 0009 1.1 9363.3 298 0010 5.8 17235.4 2 0011 7.2 13980.1 1919 0012 3.4 29550.8 1919 0013 3.7 27352.3 92000 0014 6.9 14400.9 92000 {noformat} > Performance regression of FlinkRunner stream mode due to watermark holds > update > ------------------------------------------------------------------------------- > > Key: BEAM-9566 > URL: https://issues.apache.org/jira/browse/BEAM-9566 > Project: Beam > Issue Type: Bug > Components: runner-flink, testing-nexmark > Affects Versions: 2.20.0 > Reporter: Bingfeng Xia > Assignee: Maximilian Michels > Priority: Critical > Fix For: 2.20.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Nexmark tests show that the throughput of FlinkRunner with Rocksdb state > backend dropped by 50%~80% in Query 4/5/6/9/11. Some other queries also > dropped but not as much as these queries. Affected queries contain Keyed > State. > > > Nexmark tests results. Tests have been run on the same machine and can be > reproduced. > - before regression: > {code:java} > Performance: > Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results > (Baseline) > 0000 3.8 26171.2 100000 > 0001 3.9 25967.3 92000 > 0002 1.9 53447.4 351 > 0003 2.8 35791.0 580 > 0004 2.5 4045.3 40 > 0005 9.6 10448.2 12 > 0006 1.2 8532.4 401 > 0007 4.0 25018.8 1 > 0008 2.9 34928.4 6000 > 0009 1.1 9066.2 298 > 0010 9.5 10564.1 2 > 0011 11.1 9005.0 1919 > 0012 4.5 22075.1 1919 > 0013 4.4 22547.9 92000 > 0014 9.7 10261.7 92000 > {code} > - after regression: > {code:java} > Performance: > Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results > (Baseline) > 0000 4.5 22036.1 100000 > 0001 3.9 25839.8 92000 > 0002 2.3 43763.7 351 > 0003 3.5 28669.7 580 > 0004 3.6 2801.1 40 > 0005 22.6 4429.1 12 > 0006 2.5 3993.6 401 > 0007 7.5 13320.9 1 > 0008 2.7 36737.7 6000 > 0009 2.5 3930.8 298 > 0010 16.2 6178.6 2 > 0011 82.9 1206.3 1919 > 0012 5.9 16874.8 1919 > 0013 4.2 23889.2 92000 > {code} > > The regression comes from the "updateWatermarkHold()" function recently added > in Flink DoFnOperator in > [PR#10534|h[https://github.com/apache/beam/pull/10534]] > [https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140] > It's to allow FinkRunner to set watermark holds. However, the implementation > was not performance efficient: > # "pendingTimersById" is an interface of the RocksDB key/value map state. > Iterate all values via "pendingTimersById.values()" is a big cost. According > to the bellow CPU cycles profiling result of Query 4 by > [async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we > also can see that most of the CPU time was spent on "RocksIterator.next()" > brought by "pendingTimersById.values()". > # Another overhead is that this function will be called multiple times in > each setTimer and deleteTimer; > > CPU time profiling result (FlameGraph) of Query 4 in Nexmark: > [https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh] > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)