[ 
https://issues.apache.org/jira/browse/FLINK-18433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146650#comment-17146650
 ] 

Arvid Heise edited comment on FLINK-18433 at 6/26/20, 9:25 PM:
---------------------------------------------------------------

Hi [~liyu], thanks for the update - I feared as much.

Without further information, I did run a particular comparison that may or may 
not help.

I picked TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap as it had the biggest 
regression (-17.31%) and ran some tests on a single m5ad.2xlarge (with SSD, but 
state backend is heap). I built a flink-dist from release-1.10 and release-1.11.

Since there are no built-in evaluation metrics, I just used {{time}}. To reduce 
the impact of cluster setup and to really see if it's related to heap state 
backend or network stack, I simply executed on a local executor who took the 
full 8 cores and I gave it 5gb RAM (job doesn't need much and I wanted to avoid 
too much allocation overhead).

Full commands for reference:

{noformat}
time java -Xmx5g -Dlog.file=flink_10.log 
-Dlog4j.configuration=file:///`pwd`/flink-1.10/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf/log4j.properties
 -cp flink-basic-operations_2.11-1.10-SNAPSHOT.jar:"${flink_10[*]}" 
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs 
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode 
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000 
--checkpointInterval 100 --checkpointPath file:///home/ec2-user/spda/checkpoints
time java -Xmx5g -Dlog.file=`pwd`/flink_11.log 
-Dlog4j.configurationFile=file:///`pwd`/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/conf/log4j.properties
 -cp flink-basic-operations_2.11-1.11-SNAPSHOT.jar:"${flink_11[*]}" 
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs 
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode 
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000 
--checkpointInterval 100 --checkpointPath 
file:///home/ec2-user/spda/checkpoints 
--execution.checkpointing.tolerable-failed-checkpoints 1000
{noformat}

I modified the test job to compile and to create a local executor forwarding 
the parameters to configuration (more on that later).

I ran these commands interleaved for a few hours and got [this 
sheet|https://docs.google.com/spreadsheets/d/1NPoDQakQu1apdzWZfxD2IoRgo28MpBfD9s4K9Aq9nA4/edit?usp=sharing].
 On average, we have
Flink 1.10      01m59s
Flink 1.11      01m50s
Note that less is better in this case as we measure the time needed to process 
1m elements.

So TL;DR in this particular benchmark setup, it rather looks like performance 
actually improved. Note that DOP=8 is higher than what [~Aihua] used. Assuming 
that both benchmarks are okay I see 3 options to explain them.
# We may have a regression on local input channels, but an improvement for 
remote input channels. Since, remote input channels are usually the bottleneck, 
I'd say this is rather good, but ideally we can still remove the regression 
while keeping the improvement.
# Memory management in 1.11 works differently/incorrectly. My test excludes the 
memory management on TM/JM level, so that may be the root cause for the 
original regression.
# I experienced restarts due to failed checkpoints in the end. My first 
impression was that when the job is about to be finished may cause some 
in-progress checkpoints to be canceled which is propagated to checkpoint 
coordinator, which ultimately restarts the job because by default no checkpoint 
is allowed to fail. In my final setup, I ignored these errors, but it is 
obvious that any restart would impact the performance tremendously. In my 
setup, I even ran in some kind of live lock for 1m records (100k records didn't 
suffer from these issues oddly).

I'm attaching a log that shows this live lock. [~roman_khachatryan] 
investigated but couldn't find anything suspicious.

The key errors are

{noformat}
2020-06-26 14:53:09,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
        at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_252]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_252]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
2020-06-26 14:53:09,682 DEBUG 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - 
Notification of aborted checkpoint for task Flat Map (1/1)
2020-06-26 14:53:09,682 DEBUG 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Cleanup 
AsyncCheckpointRunnable for checkpoint 1 of Flat Map (1/1).
2020-06-26 14:53:09,683 DEBUG 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map 
(1/1) - asynchronous part of checkpoint 1 could not be completed.
java.util.concurrent.CancellationException: null
        at java.util.concurrent.FutureTask.report(FutureTask.java:121) 
~[?:1.8.0_252]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_252]
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
 [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
{noformat}

maybe [~pnowojski] or [~zjwang] have more ideas. [^flink_11.log.gz] 



was (Author: aheise):
Hi [~liyu], thanks for the update - I feared as much.

Without further information, I did run a particular comparison that may or may 
not help.

I picked TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap as it had the biggest 
regression (-17.31%) and ran some tests on a single m5ad.2xlarge (with SSD, but 
state backend is heap). I built a flink-dist from release-1.10 and release-1.11.

Since there are no built-in evaluation metrics, I just used {{time}}. To reduce 
the impact of cluster setup and really see if it's related to heap state 
backend or network stack, I simply executed on a local executor who took the 
full 8 cores and I gave it 5gb RAM (job doesn't need much and I wanted to avoid 
too much allocation overhead).

Full commands for reference:

{noformat}
time java -Xmx5g -Dlog.file=flink_10.log 
-Dlog4j.configuration=file:///`pwd`/flink-1.10/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf/log4j.properties
 -cp flink-basic-operations_2.11-1.10-SNAPSHOT.jar:"${flink_10[*]}" 
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs 
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode 
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000 
--checkpointInterval 100 --checkpointPath file:///home/ec2-user/spda/checkpoints
time java -Xmx5g -Dlog.file=`pwd`/flink_11.log 
-Dlog4j.configurationFile=file:///`pwd`/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/conf/log4j.properties
 -cp flink-basic-operations_2.11-1.11-SNAPSHOT.jar:"${flink_11[*]}" 
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs 
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode 
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000 
--checkpointInterval 100 --checkpointPath 
file:///home/ec2-user/spda/checkpoints 
--execution.checkpointing.tolerable-failed-checkpoints 1000
{noformat}

I modified the test job to compile and to create a local executor forwarding 
the parameters to configuration (more on that later).

I ran these commands interleaved for a few hours and got [this 
sheet|https://docs.google.com/spreadsheets/d/1NPoDQakQu1apdzWZfxD2IoRgo28MpBfD9s4K9Aq9nA4/edit?usp=sharing].
 On average, we have
Flink 1.10      01m59s
Flink 1.11      01m50s
Note that less is better in this case as we measure the time needed to process 
1m elements.

So TL;DR in this particular benchmark setup, it rather looks like performance 
actually improved. Note that DOP=8 is higher than what [~Aihua] used. Assuming 
that both benchmarks are okay I see 3 options to explain them.
# We may have a regression on local input channels, but an improvement for 
remote input channels. Since, remote input channels are usually the bottleneck, 
I'd say this is rather good, but ideally we can still remove the regression 
while keeping the improvement.
# Memory management in 1.11 works differently/incorrectly. My test excludes the 
memory management on TM/JM level, so that may be the root cause for the 
original regression.
# I experienced restarts due to failed checkpoints in the end. My first 
impression was that when the job is about to be finished may cause some 
in-progress checkpoints to be canceled which is propagated to checkpoint 
coordinator, which ultimately restarts the job because by default no checkpoint 
is allowed to fail. In my final setup, I ignored these errors, but it is 
obvious that any restart would impact the performance tremendously. In my 
setup, I even ran in some kind of live lock for 1m records (100k records didn't 
suffer from these issues oddly).

I'm attaching a log that shows this live lock. [~roman_khachatryan] 
investigated but couldn't find anything suspicious.

The key errors are

{noformat}
2020-06-26 14:53:09,662 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
        at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_252]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_252]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
2020-06-26 14:53:09,682 DEBUG 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - 
Notification of aborted checkpoint for task Flat Map (1/1)
2020-06-26 14:53:09,682 DEBUG 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Cleanup 
AsyncCheckpointRunnable for checkpoint 1 of Flat Map (1/1).
2020-06-26 14:53:09,683 DEBUG 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map 
(1/1) - asynchronous part of checkpoint 1 could not be completed.
java.util.concurrent.CancellationException: null
        at java.util.concurrent.FutureTask.report(FutureTask.java:121) 
~[?:1.8.0_252]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_252]
        at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
 [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_252]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_252]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
{noformat}

maybe [~pnowojski] or [~zjwang] have more ideas. [^flink_11.log.gz] 


> From the end-to-end performance test results, 1.11 has a regression
> -------------------------------------------------------------------
>
>                 Key: FLINK-18433
>                 URL: https://issues.apache.org/jira/browse/FLINK-18433
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream
>    Affects Versions: 1.11.0
>         Environment: 3 machines
> [|https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations_1.11/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
>            Reporter: Aihua Li
>            Priority: Major
>
>  
> I ran end-to-end performance tests between the Release-1.10 and Release-1.11. 
> the results were as follows:
> |scenarioName|release-1.10|release-1.11| |
> |OneInput_Broadcast_LazyFromSource_ExactlyOnce_10_rocksdb|46.175|43.81333333|-5.11%|
> |OneInput_Rescale_LazyFromSource_ExactlyOnce_100_heap|211.835|200.355|-5.42%|
> |OneInput_Rebalance_LazyFromSource_ExactlyOnce_1024_rocksdb|1721.041667|1618.323333|-5.97%|
> |OneInput_KeyBy_LazyFromSource_ExactlyOnce_10_heap|46|43.615|-5.18%|
> |OneInput_Broadcast_Eager_ExactlyOnce_100_rocksdb|212.105|199.6883333|-5.85%|
> |OneInput_Rescale_Eager_ExactlyOnce_1024_heap|1754.64|1600.123333|-8.81%|
> |OneInput_Rebalance_Eager_ExactlyOnce_10_rocksdb|45.91666667|43.09833333|-6.14%|
> |OneInput_KeyBy_Eager_ExactlyOnce_100_heap|212.0816667|200.7266667|-5.35%|
> |OneInput_Broadcast_LazyFromSource_AtLeastOnce_1024_rocksdb|1718.245|1614.381667|-6.04%|
> |OneInput_Rescale_LazyFromSource_AtLeastOnce_10_heap|46.12|43.55166667|-5.57%|
> |OneInput_Rebalance_LazyFromSource_AtLeastOnce_100_rocksdb|212.0383333|200.3883333|-5.49%|
> |OneInput_KeyBy_LazyFromSource_AtLeastOnce_1024_heap|1762.048333|1606.408333|-8.83%|
> |OneInput_Broadcast_Eager_AtLeastOnce_10_rocksdb|46.05833333|43.49666667|-5.56%|
> |OneInput_Rescale_Eager_AtLeastOnce_100_heap|212.2333333|201.1883333|-5.20%|
> |OneInput_Rebalance_Eager_AtLeastOnce_1024_rocksdb|1720.663333|1616.85|-6.03%|
> |OneInput_KeyBy_Eager_AtLeastOnce_10_heap|46.14|43.62333333|-5.45%|
> |TwoInputs_Broadcast_LazyFromSource_ExactlyOnce_100_rocksdb|156.9183333|152.9566667|-2.52%|
> |TwoInputs_Rescale_LazyFromSource_ExactlyOnce_1024_heap|1415.511667|1300.1|-8.15%|
> |TwoInputs_Rebalance_LazyFromSource_ExactlyOnce_10_rocksdb|34.29666667|34.16666667|-0.38%|
> |TwoInputs_KeyBy_LazyFromSource_ExactlyOnce_100_heap|158.3533333|151.8483333|-4.11%|
> |TwoInputs_Broadcast_Eager_ExactlyOnce_1024_rocksdb|1373.406667|1300.056667|-5.34%|
> |TwoInputs_Rescale_Eager_ExactlyOnce_10_heap|34.57166667|32.09666667|-7.16%|
> |TwoInputs_Rebalance_Eager_ExactlyOnce_100_rocksdb|158.655|147.44|-7.07%|
> |TwoInputs_KeyBy_Eager_ExactlyOnce_1024_heap|1356.611667|1292.386667|-4.73%|
> |TwoInputs_Broadcast_LazyFromSource_AtLeastOnce_10_rocksdb|34.01|33.205|-2.37%|
> |TwoInputs_Rescale_LazyFromSource_AtLeastOnce_100_heap|149.5883333|145.9966667|-2.40%|
> |TwoInputs_Rebalance_LazyFromSource_AtLeastOnce_1024_rocksdb|1359.74|1299.156667|-4.46%|
> |TwoInputs_KeyBy_LazyFromSource_AtLeastOnce_10_heap|34.025|29.68333333|-12.76%|
> |TwoInputs_Broadcast_Eager_AtLeastOnce_100_rocksdb|157.3033333|151.4616667|-3.71%|
> |TwoInputs_Rescale_Eager_AtLeastOnce_1024_heap|1368.74|1293.238333|-5.52%|
> |TwoInputs_Rebalance_Eager_AtLeastOnce_10_rocksdb|34.325|33.285|-3.03%|
> |TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap|162.5116667|134.375|-17.31%|
> It can be seen that the performance of 1.11 has a regression, basically 
> around 5%, and the maximum regression is 17%. This needs to be checked.
> the test code:
> flink-1.10.0: 
> [https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
> flink-1.11.0: 
> [https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations_1.11/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
> commit cmd like tis:
> bin/flink run -d -m 192.168.39.246:8081 -c 
> org.apache.flink.basic.operations.PerformanceTestJob 
> /home/admin/flink-basic-operations_2.11-1.10-SNAPSHOT.jar --topologyName 
> OneInput --LogicalAttributesofEdges Broadcast --ScheduleMode LazyFromSource 
> --CheckpointMode ExactlyOnce --recordSize 10 --stateBackend rocksdb
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to