My only 2 cents is that when I started to turn the mem pre-allocation param, to
true & #slots & #buffers....I started to get all kinds of Akka & Disassociated
exceptions thrown by the JM regarding the TMs...So yes, since I am also not
well aware of Akka internals...I went back to my previous config & continued
with turning knobs that wouldn't cause Akka exceptions.Thanks+regardsAmir-
From: "Chawla,Sumit" <[email protected]>
To: [email protected]
Sent: Wednesday, September 21, 2016 11:08 AM
Subject: Re: Get Flink ExecutionGraph Programmatically
Hi Chesney
I am actually running this code in the same JVM as the WebInterface and
JobManager. I am programmatically, starting the JobManager. and then
running this code in same JVM to query metrics. Only difference could be
that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
forces it to execute the code as if request is coming over the wire. I am
not very well aware of Akka internals, so may be somebody can shed some
light on it.
Regards
Sumit Chawla
On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[email protected]>
wrote:
> Hello,
>
> this is a rather subtle issue you stumbled upon here.
>
> The ExecutionGraph is not serializable. The only reason why the
> WebInterface can access it is because it runs in the same JVM as the
> JobManager.
>
> I'm not sure if there is a way for what you are trying to do.
>
> Regards,
> Chesnay
>
>
> On 21.09.2016 06:11, Chawla,Sumit wrote:
>
>> Hi All
>>
>>
>> I am trying to get JOB accumulators. ( I am aware that I can get the
>> accumulators through REST APIs as well, but i wanted to avoid JSON
>> parsing).
>>
>> Looking at JobAccumulatorsHandler i am trying to get execution graph for
>> currently running job. Following is my code:
>>
>> InetSocketAddress initialJobManagerAddress=new
>> InetSocketAddress(hostName,port);
>> InetAddress ownHostname;
>> ownHostname=
>> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>>
>> ActorSystem actorSystem= AkkaUtils.createActorSystem(co
>> nfiguration,
>> new Some(new
>> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>>
>> FiniteDuration timeout= FiniteDuration.apply(10,
>> TimeUnit.SECONDS);
>>
>> ActorGateway akkaActorGateway=
>> LeaderRetrievalUtils.retrieveLeaderGateway(
>>
>> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>> actorSystem,timeout
>> );
>>
>>
>> Future<Object> future=akkaActorGateway.ask(new
>> RequestJobDetails(true,false),timeout);
>>
>> MultipleJobsDetails result=(MultipleJobsDetails)
>> Await.result(future,timeout);
>> ExecutionGraphHolder executionGraphHolder=new
>> ExecutionGraphHolder(timeout);
>> LOG.info(result.toString());
>> for(JobDetails detail:result.getRunningJobs()){
>> LOG.info(detail.getJobName() + " ID " +
>> detail.getJobId());
>>
>> * ExecutionGraph
>> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),
>> akkaActorGateway);*
>>
>> LOG.info("Accumulators " +
>> executionGraph.aggregateUserAccumulators());
>> }
>>
>>
>> However, i am receiving following error in Flink:
>>
>> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
>> ERROR akka.remote.EndpointWriter - Transient association error
>> (association
>> remains live)
>> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
>> CheckpointCoordinator
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
>> java:348)
>> ~[?:1.8.0_92]
>> at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>> at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>> at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:
>> 129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.s
>> cala:36)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>> at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
>> 844)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>
>> Any reason why its failing? This code works when invoked through
>> WebRuntimeMonitor.
>>
>> Regards
>> Sumit Chawla
>>
>>
>