[
https://issues.apache.org/jira/browse/FLINK-37576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Weijie Guo updated FLINK-37576:
-------------------------------
Priority: Blocker (was: Major)
> Batch job failed when submit JobGraph contains broadcast edge
> -------------------------------------------------------------
>
> Key: FLINK-37576
> URL: https://issues.apache.org/jira/browse/FLINK-37576
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.0.0
> Reporter: Weijie Guo
> Priority: Blocker
>
> When we submit a job via JobGraph directly, it will raise if we have a
> broadcast edge. IMO, We should correctly set the {{isBroadcast}} field in
> constructor of
> {{{}AllToAllBlockingResultInfo{}}}.
> For example:
> {code:java}
> public class TestJob {
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> // set min parallelism great then 1
>
> conf.setString("execution.batch.adaptive.auto-parallelism.min-parallelism",
> "2");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> // env.disableOperatorChaining();
> // set to batch mode
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 10).broadcast().sinkTo(new DiscardingSink<>());
> JobGraphRunningUtil.execute(env.getStreamGraph().getJobGraph(), conf,
> 2, 10);
> }
> } {code}
>
> {code:java}
> Caused by: java.lang.IllegalArgumentException:
> java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(VertexInputInfoComputationUtils.java:235)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll(VertexInputInfoComputationUtils.java:185)
> at
> org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfos(VertexInputInfoComputationUtils.java:82)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.computeJobVertexInputInfosForInputsWithInterKeysCorrelation(AllToAllVertexInputInfoComputer.java:195)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.util.AllToAllVertexInputInfoComputer.compute(AllToAllVertexInputInfoComputer.java:126)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForNonSource(DefaultVertexParallelismAndInputInfosDecider.java:237)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(DefaultVertexParallelismAndInputInfosDecider.java:156)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.tryDecideParallelismAndInputInfos(AdaptiveBatchScheduler.java:740)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:698)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$onTaskFinished$4(AdaptiveBatchScheduler.java:417)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
> ... 29 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)