[ 
https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974209#comment-16974209
 ] 

Zhu Zhu commented on FLINK-14735:
---------------------------------

Here's a sample test case([testInputConstraintALLPerf 
|https://github.com/zhuzhurk/flink/commit/0215526015ea8c2fd82cbaa875a726f25d29f284])
 to produce this issue.
It takes ~30s in my local environment.
With the improvement considering ALL-to-ALL edge 
([commit|https://github.com/zhuzhurk/flink/commit/b4a8cb0bca1aefbda68a7a3d9c1092cb9bed2e0c]),
 it takes ~50ms.
With Java Streams refactored 
([commit|https://github.com/zhuzhurk/flink/commit/79bda27b6b898cb131a4384b228c6aa82a157469]),
 it takes ~15s.

> Improve batch schedule check input consumable performance
> ---------------------------------------------------------
>
>                 Key: FLINK-14735
>                 URL: https://issues.apache.org/jira/browse/FLINK-14735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Jingsong Lee
>            Priority: Major
>             Fix For: 1.10.0
>
>
> Now if we launch batch job with 1000+ parallelism:
> Even if we set the akka timeout of 2 minutes, the heartbeat is likely to 
> timeout.
>  JobMaster is buzy:
> {code:java}
> java.lang.Thread.State: RUNNABLE
>         at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>         at 
> java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958)
>         at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at 
> java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown
>  Source)
>         at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119)
>         at 
> java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89)
>         at 
> java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
>         at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>         at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>         at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>         at 
> java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>         at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597)
>         at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to