Hi guys, We have faced with the issue to run a Beam job with Flink Runner on EMR in application and per-job-cluster modes (while it works ok in session mode). Job Manager starts, but task managers are not.
Beam 2.27, 2.28 Flink 1.11.2 EMR 6.2.0 Job Manager log: … INFO: JobManager runner for job word-count-beam (5ba81e02dda8f2efb27f7f984bf426b0) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://[email protected]:35491/user/rpc/jobmanager_2. Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.jobmaster.JobMaster startJobExecution INFO: Starting execution of job word-count-beam (5ba81e02dda8f2efb27f7f984bf426b0) under job master id 00000000000000000000000000000000. Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.scheduler.DefaultScheduler startSchedulingInternal INFO: Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState INFO: Job word-count-beam (5ba81e02dda8f2efb27f7f984bf426b0) switched from state CREATED to RUNNING. … INFO: Could not resolve ResourceManager address akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*. … SEVERE: Unhandled exception. org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:302) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:58) at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*. at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$10(AkkaRpcService.java:520) at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59) ... 8 more Caused by: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*. ... 10 more Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://flink/), Path(/user/rpc/resourcemanager_*)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:71) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:69) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:81) Using the same cluster with the same flink config, we can run flink example jobs, so I guess issue could be on Beam side. Could you please advice has anyone faced with similar issues before/was able successfully run streaming Beam job on EMR YARN in application or per-job-cluster modes. Best regards, Dmytro Dragan | [email protected]<mailto:[email protected]> | Lead Big Data Engineer| SoftServe<http://www.softserveinc.com/>
