[ 
https://issues.apache.org/jira/browse/FLINK-34132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813095#comment-17813095
 ] 

Zhu Zhu commented on FLINK-34132:
---------------------------------

We should also migrate the existing batch examples from DataSet to DataStream 
so that it can directly work with AdaptiveBatchScheduler. 
This work needs to be done before removing the DataSet API in Flink 2.0.
cc [~Wencong Liu]

> Batch WordCount job fails when run with AdaptiveBatch scheduler
> ---------------------------------------------------------------
>
>                 Key: FLINK-34132
>                 URL: https://issues.apache.org/jira/browse/FLINK-34132
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation
>    Affects Versions: 1.17.1, 1.18.1
>            Reporter: Prabhu Joseph
>            Assignee: Junrui Li
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0
>
>
> Batch WordCount job fails when run with AdaptiveBatch scheduler.
> *Repro Steps*
> {code:java}
> flink-yarn-session -Djobmanager.scheduler=adaptive -d
>  flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
> s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
> {code}
> *Error logs*
> {code:java}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
>       at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
>       at 
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
>       at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>       ... 12 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
>       ... 20 more
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>       at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
> requires batch workloads to be executed with types of all edges being 
> BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
> 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>       ... 3 more
> Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
> scheduler requires batch workloads to be executed with types of all edges 
> being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to 
> configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
> 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
>       at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       ... 3 more
> {code}
> *Analysis*
> I have configured the execution.batch-shuffle-mode to use either 
> ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or 
> ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error 
> message.
> The Wordcount program runs fine when setting below in the code
> {code:java}
> env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
> {code}
> Need to investigate why the execution.batch-shuffle-mode is not being 
> recognized and, if this behavior is intentional, correct the reported 
> misleading error message. Additionally, we need to address the Wordcount job 
> to ensure it runs seamlessly with both batch and adaptive scheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to