Hi All
I am trying to get JOB accumulators. ( I am aware that I can get the
accumulators through REST APIs as well, but i wanted to avoid JSON
parsing).
Looking at JobAccumulatorsHandler i am trying to get execution graph for
currently running job. Following is my code:
InetSocketAddress initialJobManagerAddress=new
InetSocketAddress(hostName,port);
InetAddress ownHostname;
ownHostname=
ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
new Some(new
Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);
ActorGateway akkaActorGateway=
LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
actorSystem,timeout
);
Future<Object> future=akkaActorGateway.ask(new
RequestJobDetails(true,false),timeout);
MultipleJobsDetails result=(MultipleJobsDetails)
Await.result(future,timeout);
ExecutionGraphHolder executionGraphHolder=new
ExecutionGraphHolder(timeout);
LOG.info(result.toString());
for(JobDetails detail:result.getRunningJobs()){
LOG.info(detail.getJobName() + " ID " + detail.getJobId());
* ExecutionGraph
executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
LOG.info("Accumulators " +
executionGraph.aggregateUserAccumulators());
}
However, i am receiving following error in Flink:
2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
ERROR akka.remote.EndpointWriter - Transient association error (association
remains live)
java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
CheckpointCoordinator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
~[akka-remote_2.10-2.3.7.jar:?]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
~[akka-remote_2.10-2.3.7.jar:?]
Any reason why its failing? This code works when invoked through
WebRuntimeMonitor.
Regards
Sumit Chawla