My only 2 cents is that when I started to turn the mem pre-allocation param, to true & #slots & #buffers....I started to get all kinds of Akka & Disassociated exceptions thrown by the JM regarding the TMs...So yes, since I am also not well aware of Akka internals...I went back to my previous config & continued with turning knobs that wouldn't cause Akka exceptions.Thanks+regardsAmir-
From: "Chawla,Sumit" <sumitkcha...@gmail.com> To: dev@flink.apache.org Sent: Wednesday, September 21, 2016 11:08 AM Subject: Re: Get Flink ExecutionGraph Programmatically Hi Chesney I am actually running this code in the same JVM as the WebInterface and JobManager. I am programmatically, starting the JobManager. and then running this code in same JVM to query metrics. Only difference could be that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it forces it to execute the code as if request is coming over the wire. I am not very well aware of Akka internals, so may be somebody can shed some light on it. Regards Sumit Chawla On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <ches...@apache.org> wrote: > Hello, > > this is a rather subtle issue you stumbled upon here. > > The ExecutionGraph is not serializable. The only reason why the > WebInterface can access it is because it runs in the same JVM as the > JobManager. > > I'm not sure if there is a way for what you are trying to do. > > Regards, > Chesnay > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > >> Hi All >> >> >> I am trying to get JOB accumulators. ( I am aware that I can get the >> accumulators through REST APIs as well, but i wanted to avoid JSON >> parsing). >> >> Looking at JobAccumulatorsHandler i am trying to get execution graph for >> currently running job. Following is my code: >> >> InetSocketAddress initialJobManagerAddress=new >> InetSocketAddress(hostName,port); >> InetAddress ownHostname; >> ownHostname= >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400); >> >> ActorSystem actorSystem= AkkaUtils.createActorSystem(co >> nfiguration, >> new Some(new >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); >> >> FiniteDuration timeout= FiniteDuration.apply(10, >> TimeUnit.SECONDS); >> >> ActorGateway akkaActorGateway= >> LeaderRetrievalUtils.retrieveLeaderGateway( >> >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration), >> actorSystem,timeout >> ); >> >> >> Future<Object> future=akkaActorGateway.ask(new >> RequestJobDetails(true,false),timeout); >> >> MultipleJobsDetails result=(MultipleJobsDetails) >> Await.result(future,timeout); >> ExecutionGraphHolder executionGraphHolder=new >> ExecutionGraphHolder(timeout); >> LOG.info(result.toString()); >> for(JobDetails detail:result.getRunningJobs()){ >> LOG.info(detail.getJobName() + " ID " + >> detail.getJobId()); >> >> * ExecutionGraph >> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(), >> akkaActorGateway);* >> >> LOG.info("Accumulators " + >> executionGraph.aggregateUserAccumulators()); >> } >> >> >> However, i am receiving following error in Flink: >> >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody >> ERROR akka.remote.EndpointWriter - Transient association error >> (association >> remains live) >> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint. >> CheckpointCoordinator >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1184) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >> ream.java:1548) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >> m.java:1509) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >> tream.java:1432) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >> ream.java:1548) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >> m.java:1509) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >> tream.java:1432) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> ~[?:1.8.0_92] >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream. >> java:348) >> ~[?:1.8.0_92] >> at akka.serialization.JavaSerializer$$anonfun$ >> toBinary$1.apply$mcV$sp(Serializer.scala:129) >> ~[akka-actor_2.10-2.3.7.jar:?] >> at akka.serialization.JavaSerializer$$anonfun$ >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] >> at akka.serialization.JavaSerializer$$anonfun$ >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala: >> 57) >> ~[scala-library-2.10.5.jar:?] >> at akka.serialization.JavaSerializer.toBinary(Serializer.scala: >> 129) >> ~[akka-actor_2.10-2.3.7.jar:?] >> at akka.remote.MessageSerializer$.serialize(MessageSerializer.s >> cala:36) >> ~[akka-remote_2.10-2.3.7.jar:?] >> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply >> (Endpoint.scala:845) >> ~[akka-remote_2.10-2.3.7.jar:?] >> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply >> (Endpoint.scala:845) >> ~[akka-remote_2.10-2.3.7.jar:?] >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala: >> 57) >> ~[scala-library-2.10.5.jar:?] >> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala: >> 844) >> ~[akka-remote_2.10-2.3.7.jar:?] >> >> Any reason why its failing? This code works when invoked through >> WebRuntimeMonitor. >> >> Regards >> Sumit Chawla >> >> >