[ https://issues.apache.org/jira/browse/FLINK-14735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974209#comment-16974209 ]
Zhu Zhu commented on FLINK-14735: --------------------------------- Here's a sample test case([testInputConstraintALLPerf |https://github.com/zhuzhurk/flink/commit/0215526015ea8c2fd82cbaa875a726f25d29f284]) to produce this issue. It takes ~30s in my local environment. With the improvement considering ALL-to-ALL edge ([commit|https://github.com/zhuzhurk/flink/commit/b4a8cb0bca1aefbda68a7a3d9c1092cb9bed2e0c]), it takes ~50ms. With Java Streams refactored ([commit|https://github.com/zhuzhurk/flink/commit/79bda27b6b898cb131a4384b228c6aa82a157469]), it takes ~15s. > Improve batch schedule check input consumable performance > --------------------------------------------------------- > > Key: FLINK-14735 > URL: https://issues.apache.org/jira/browse/FLINK-14735 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task > Reporter: Jingsong Lee > Priority: Major > Fix For: 1.10.0 > > > Now if we launch batch job with 1000+ parallelism: > Even if we set the akka timeout of 2 minutes, the heartbeat is likely to > timeout. > JobMaster is buzy: > {code:java} > java.lang.Thread.State: RUNNABLE > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958) > at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) > at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.isInputConsumable(ExecutionVertex.java:824) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex$$Lambda$257/564237119.test(Unknown > Source) > at java.util.stream.MatchOps$2MatchSink.accept(MatchOps.java:119) > at > java.util.stream.Streams$RangeIntSpliterator.tryAdvance(Streams.java:89) > at > java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162) > at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.IntPipeline.allMatch(IntPipeline.java:482) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.checkInputDependencyConstraints(ExecutionVertex.java:811) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleOrUpdateConsumers(Execution.java:889) > at > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1074) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1597) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1570) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:424) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)