[
https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Junrui Lee resolved FLINK-37576.
--------------------------------
Fix Version/s: 2.1.0
2.0.1
Resolution: Fixed
master d5e1ff2ecabea0c6babdc0068b584df74571d181
release-2.0 247b070943d54dfb749bc97f8d93a87d6db8cd05
> Batch job failed when submit JobGraph contains broadcast edge
> -------------------------------------------------------------
>
> Key: FLINK-37576
> URL: https://issues.apache.org/jira/browse/FLINK-37576
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.0.0
> Reporter: Weijie Guo
> Assignee: Junrui Lee
> Priority: Critical
> Labels: pull-request-available
> Fix For: 2.1.0, 2.0.1
>
>
> When we submit a job via JobGraph directly, it will raise if we have a
> broadcast edge. IMO, We should correctly set the {{isBroadcast}} field in
> constructor of
> {{{}AllToAllBlockingResultInfo{}}}.
> For example:
> {code:java}
> public class TestJob {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> // set min parallelism great then 1
>
> conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism",
> "2");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> // set to batch mode
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
> JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf,
> 2, 10);
> }
> } {code}
>
> {code:java}
> Caused by: java.lang.IllegalArgumentException:
> java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
> ... 29 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)