[ 
https://issues.apache.org/jira/browse/BEAM-9566?focusedWorklogId=410293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-410293
 ]

ASF GitHub Bot logged work on BEAM-9566:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Mar/20 14:08
            Start Date: 26/Mar/20 14:08
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #11237: [BEAM-9566] 
Mitigate performance issue for output timestamp watermark holds
URL: https://github.com/apache/beam/pull/11237
 
 
   Benchmarks have shown that the minimum output timestamp computation over all
   timers is very expensive because we have to iterate over all keys in the 
state
   backend. This is especially costly for RocksDB.
   
   This change introduces a cache which stores and updates the timer output
   timestamps such that the minimum can be retrieved efficiently.
   
   Only at startup the cache is populated from the state backend. This should
   give us roughly the same performance as before introducing output timestamps 
for
   timers.
   
   The following have been run with RocksDB:
   
   Nexmark Query 12 before:
   ```
     Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results 
   (Baseline)
     0000         105.3                       950.0                        9039
     0001           2.2                      4466.3                         196
   ```
   
   Nexmark query 12 with this change:
   ```
   Performance:
     Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results 
   (Baseline)
     0000           6.0                     16730.8                        1919
     0001           0.8                     12391.6                         196
   ```
   
   For more details please see BEAM-9566.
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 410293)
    Remaining Estimate: 0h
            Time Spent: 10m

> Performance regression of FlinkRunner stream mode due to watermark holds 
> update
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-9566
>                 URL: https://issues.apache.org/jira/browse/BEAM-9566
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, testing-nexmark
>    Affects Versions: 2.20.0
>            Reporter: Bingfeng Xia
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.20.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Nexmark tests show that the throughput of FlinkRunner with Rocksdb state 
> backend dropped by 50%~80% in Query 4/5/6/9/11.  Some other queries also 
> dropped but not as much as these queries. Affected queries contain Keyed 
> State.
>  
>  
> Nexmark tests results. Tests have been run on the same machine and can be 
> reproduced.
>  - before regression:
> {code:java}
> Performance:
>   Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results  
>   (Baseline)
>   0000           3.8                     26171.2                      100000
>   0001           3.9                     25967.3                       92000
>   0002           1.9                     53447.4                         351
>   0003           2.8                     35791.0                         580
>   0004           2.5                      4045.3                          40
>   0005           9.6                     10448.2                          12
>   0006           1.2                      8532.4                         401
>   0007           4.0                     25018.8                           1
>   0008           2.9                     34928.4                        6000
>   0009           1.1                      9066.2                         298
>   0010           9.5                     10564.1                           2
>   0011          11.1                      9005.0                        1919
>   0012           4.5                     22075.1                        1919
>   0013           4.4                     22547.9                       92000
>   0014           9.7                     10261.7                       92000
> {code}
>  - after regression:
> {code:java}
> Performance:
>   Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results  
>   (Baseline)
>   0000           4.5                     22036.1                      100000
>   0001           3.9                     25839.8                       92000
>   0002           2.3                     43763.7                         351
>   0003           3.5                     28669.7                         580
>   0004           3.6                      2801.1                          40
>   0005          22.6                      4429.1                          12
>   0006           2.5                      3993.6                         401
>   0007           7.5                     13320.9                           1
>   0008           2.7                     36737.7                        6000
>   0009           2.5                      3930.8                         298
>   0010          16.2                      6178.6                           2
>   0011          82.9                      1206.3                        1919
>   0012           5.9                     16874.8                        1919
>   0013           4.2                     23889.2                       92000
> {code}
>  
> The regression comes from the "updateWatermarkHold()" function recently added 
> in Flink DoFnOperator in 
> [PR#10534|h[https://github.com/apache/beam/pull/10534]]
> [https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140]
> It's to allow FinkRunner to set watermark holds. However, the implementation 
> was not performance efficient:
>  # "pendingTimersById" is an interface of the RocksDB key/value map state. 
> Iterate all values via "pendingTimersById.values()" is a big cost. According 
> to the bellow CPU cycles profiling result of Query 4 by 
> [async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we 
> also can see that most of the CPU time was spent on "RocksIterator.next()" 
> brought by "pendingTimersById.values()".
>  # Another overhead is that this function will be called multiple times in 
> each setTimer and deleteTimer;
>  
> CPU time profiling result (FlameGraph) of Query 4 in Nexmark:
> [https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to