[ https://issues.apache.org/jira/browse/FLINK-34132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu updated FLINK-34132: ---------------------------- Component/s: Documentation > Batch WordCount job fails when run with AdaptiveBatch scheduler > --------------------------------------------------------------- > > Key: FLINK-34132 > URL: https://issues.apache.org/jira/browse/FLINK-34132 > Project: Flink > Issue Type: Bug > Components: Documentation > Affects Versions: 1.17.1, 1.18.1 > Reporter: Prabhu Joseph > Assignee: Junrui Li > Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Batch WordCount job fails when run with AdaptiveBatch scheduler. > *Repro Steps* > {code:java} > flink-yarn-session -Djobmanager.scheduler=adaptive -d > flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input > s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT > {code} > *Error logs* > {code:java} > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067) > at > org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ... 12 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062) > ... 20 more > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: At the moment, adaptive batch scheduler > requires batch workloads to be executed with types of all edges being > BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure > 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or > 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) > ... 3 more > Caused by: java.lang.IllegalStateException: At the moment, adaptive batch > scheduler requires batch workloads to be executed with types of all edges > being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to > configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or > 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324) > at > org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127) > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > ... 3 more > {code} > *Analysis* > I have configured the execution.batch-shuffle-mode to use either > ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or > ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error > message. > The Wordcount program runs fine when setting below in the code > {code:java} > env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); > {code} > Need to investigate why the execution.batch-shuffle-mode is not being > recognized and, if this behavior is intentional, correct the reported > misleading error message. Additionally, we need to address the Wordcount job > to ensure it runs seamlessly with both batch and adaptive scheduler. -- This message was sent by Atlassian Jira (v8.20.10#820010)