Hi, I noticed that you configured the Akka framesize to 2GB (the default being 10MB). This appears like quite a lot to me and might be causing problems since the exceptions indicate an Akka timeout issue. Did configure the framesize for a particular reason that high?
It seems that you are running a custom build of Flink. Which version did you base your build on? Best, Fabian 2018-05-08 17:41 GMT+02:00 Chan, Regina <regina.c...@gs.com>: > There’s no collect() explicitly from me. It has a cogroup operator before > writing to DataSink. > > > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Monday, May 07, 2018 6:31 AM > *To:* Chan, Regina [Tech] > *Cc:* user@flink.apache.org; Newport, Billy [Tech] > *Subject:* Re: Lost JobManager > > > > Hi Regina, > > I see from the logs that you are using the DataSet API. > > Are you trying to fetch a large result to your client using the collect() > method? > > Best, Fabian > > > > 2018-05-02 0:38 GMT+02:00 Chan, Regina <regina.c...@gs.com>: > > Hi, > > > > I’m running a single TM with the following params -yn 1 -ys 2 -yjm 36864 > -ytm 24576 -yD akka.framesize=2097152000b > > > > I keep repeatedly getting this error where I’m losing connection to the > JobManager. It’s not clear why, it seems like the job is finishing up until > the Datasink but the JobManager is lost before that completes. I’ve > attached the jobmanager logs. Snippet further below. > > > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:478) > > at org.apache.flink.yarn.YarnClusterClient.submitJob( > YarnClusterClient.java:205) > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:442) > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:429) > > at org.apache.flink.client.program.ContextEnvironment. > execute(ContextEnvironment.java:62) > > at org.apache.flink.api.java.ExecutionEnvironment.execute( > ExecutionEnvironment.java:926) > > at com.gs.ep.da.lake.refinerlib.flink.FlowDataBase.execute( > FlowDataBase.java:40) > > at com.gs.ep.da.lake.refinerlib.flink.FlowData. > runPartialDataFlow(FlowData.java:148) > > at com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData. > java:56) > > at com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData. > java:24) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:471) > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Couldn't retrieve the JobExecutionResult from the JobManager. > > at org.apache.flink.runtime.client.JobClient. > awaitJobResult(JobClient.java:309) > > at org.apache.flink.runtime.client.JobClient. > submitJobAndWait(JobClient.java:396) > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:467) > > ... 15 more > > Caused by: > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > Lost connection to the JobManager. > > at org.apache.flink.runtime.client.JobClientActor. > handleMessage(JobClientActor.java:219) > > at org.apache.flink.runtime.akka.FlinkUntypedActor. > handleLeaderSessionID(FlinkUntypedActor.java:101) > > at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive( > FlinkUntypedActor.java:68) > > at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( > UntypedActor.scala:167) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > > at akka.dispatch.ForkJoinExecutorConfigurator$ > AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > > > > > -------------- > > Jobmanager logs below: > > > > 506735 [flink-akka.actor.default-dispatcher-18] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (Map at > handleMilestoning(MergeTask.java:287)) (1/1) ( > b688af3e269a022daa48e1db7adf4b34) switched from RUNNING to FINISHED. > > 857174 [flink-akka.actor.default-dispatcher-19] INFO > org.apache.flink.yarn.YarnJobManager - Stopping JobManager with final > application status SUCCEEDED and diagnostics: Flink YARN Client requested > shutdown > > 857175 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Shutting down cluster > with status SUCCEEDED : Flink YARN Client requested shutdown > > 857176 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Unregistering > application from the YARN Resource Manager > > 857180 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for > application to be successfully unregistered. > > 857282 [AMRM Callback Handler Thread] INFO org.apache.hadoop.yarn.client. > api.async.impl.AMRMClientAsyncImpl - Interrupted while waiting for queue > > java.lang.InterruptedException at java.util.concurrent.locks. > AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait( > AbstractQueuedSynchronizer.java:2017) > > at java.util.concurrent.locks.AbstractQueuedSynchronizer$ > ConditionObject.await(AbstractQueuedSynchronizer.java:2052) > > at java.util.concurrent.LinkedBlockingQueue.take( > LinkedBlockingQueue.java:442) > > at org.apache.hadoop.yarn.client.api.async.impl. > AMRMClientAsyncImpl$CallbackHandlerThread.run( > AMRMClientAsyncImpl.java:275) > > 857323 [flink-akka.actor.default-dispatcher-21] ERROR > org.apache.hadoop.yarn.client.api.impl.NMClientImpl - Failed to stop > Container container_1524967251050_56140_01_000003when stopping > NMClientImpl > > 857323 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Closing proxy : d191291-019.dc.gs.com:45454 > > 857326 [flink-akka.actor.default-dispatcher-19] INFO > org.apache.flink.yarn.YarnJobManager - Deleting yarn application files > under hdfs://d191291/user/delp/.flink/application_1524967251050_56140. > > 857326 [flink-akka.actor.default-dispatcher-27] ERROR > org.apache.flink.yarn.YarnApplicationMasterRunner - Actor > akka://flink/user/$b#256544107 terminated, stopping process... > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.http.authentication.kerberos.keytab; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.http.authentication.kerberos.principal; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.http.authentication.signature.secret.file; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.http.authentication.simple.anonymous.allowed; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.http.authentication.type; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.kerberos.kinit.command; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.rpc.protection; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.auth_to_local; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.authentication; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.authorization; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping; Ignoring. > > 857345 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.base; Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.bind.password.file; > Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.search.attr.group.name > <https://urldefense.proofpoint.com/v2/url?u=http-3A__hadoop.security.group.mapping.ldap.search.attr.group.name&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=Ww2hPp6X8ppGooZTMuCR2cJVwSQ4M6EoagwzMM87j-Y&s=hUHwzsw2Dw126sqpY6lFqkaIhtSlCn3uRoW3LSxCmzw&e=>; > Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.search.attr.member; > Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.search.filter.group; > Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: hadoop.security.group.mapping.ldap.url; Ignoring. > > 857346 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/core-site.xml:an attempt to override final > parameter: ipc.client.fallback-to-simple-auth-allowed; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.block.access.token.enable; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.client.block.write.replace-datanode-on-failure.policy; > Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.client.read.shortcircuit; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.cluster.administrators; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.address; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.http.address; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.ipc.address; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.kerberos.principal; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.datanode.keytab.file; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.domain.socket.path; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.hosts; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.hosts.exclude; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.namenode.kerberos.internal.spnego.principal; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.namenode.kerberos.principal; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.namenode.keytab.file; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.permissions.superusergroup; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.secondary.namenode.kerberos.internal.spnego.principal; > Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.secondary.namenode.kerberos.principal; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.secondary.namenode.keytab.file; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.web.authentication.kerberos.keytab; Ignoring. > > 857347 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.conf.Configuration - /gns/software/infra/big-data/ > hadoop/hdp-2.2.gs6/hadoop/conf/hdfs-site.xml:an attempt to override final > parameter: dfs.web.authentication.kerberos.principal; Ignoring. > > 857349 [flink-akka.actor.default-dispatcher-19] WARN > org.apache.hadoop.hdfs.BlockReaderLocal - The short-circuit local reads > feature cannot be used because libhadoop cannot be loaded. > > 857356 [flink-akka.actor.default-dispatcher-19] INFO > org.apache.flink.yarn.YarnJobManager - Stopping JobManager akka.tcp:// > fl...@d191291-053.dc.gs.com:33812/user/jobmanager. > > 857357 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java > Job at Tue May 01 17:58:37 EDT 2018 (1673e636d863ff81916b47c4296ed026) > switched from state RUNNING to SUSPENDED. > > java.lang.Exception: The JobManager is shutting down. > > at org.apache.flink.runtime.jobmanager.JobManager. > postStop(JobManager.scala:240) > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > > at org.apache.flink.runtime.jobmanager.JobManager. > aroundPostStop(JobManager.scala:125) > > at akka.actor.dungeon.FaultHandling$class.akka$ > actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > > at akka.actor.dungeon.FaultHandling$class.terminate( > FaultHandling.scala:172) > > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > > at akka.dispatch.Mailbox.processAllSystemMessages( > Mailbox.scala:263) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:241) > > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > > at akka.dispatch.ForkJoinExecutorConfigurator$ > AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > > > *Regina Chan* > > *Goldman Sachs** –* Enterprise Platforms, Data Architecture > > *30 Hudson Street, 37th floor | Jersey City, NY 07302 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__maps.google.com_-3Fq-3D30-2BHudson-2BStreet-2C-2B37th-2Bfloor-2B-257C-2BJersey-2BCity-2C-2BNY-2B07302-26entry-3Dgmail-26source-3Dg&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=Ww2hPp6X8ppGooZTMuCR2cJVwSQ4M6EoagwzMM87j-Y&s=tsY3C_SI_e0hPm1irahnu7gMx-Sch8AAH75csCikTss&e=>* > ( (212) 902-5697 > > > > >