Hi All

I am trying to get JOB  accumulators.  ( I am aware that I can get the
accumulators through REST APIs as well, but i wanted to avoid JSON
parsing).

Looking at JobAccumulatorsHandler i am trying to get execution graph for
currently running job.  Following is my code:

  InetSocketAddress initialJobManagerAddress=new
InetSocketAddress(hostName,port);
            InetAddress ownHostname;
            ownHostname=
ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);

            ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
                    new Some(new
Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));

            FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);

            ActorGateway akkaActorGateway=
LeaderRetrievalUtils.retrieveLeaderGateway(

LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
                    actorSystem,timeout
            );


            Future<Object> future=akkaActorGateway.ask(new
RequestJobDetails(true,false),timeout);

            MultipleJobsDetails result=(MultipleJobsDetails)
Await.result(future,timeout);
            ExecutionGraphHolder executionGraphHolder=new
ExecutionGraphHolder(timeout);
            LOG.info(result.toString());
            for(JobDetails detail:result.getRunningJobs()){
                LOG.info(detail.getJobName() + "  ID " + detail.getJobId());

*                ExecutionGraph
executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
               LOG.info("Accumulators " +
executionGraph.aggregateUserAccumulators());
            }


However, i am receiving following error in Flink:

2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
ERROR akka.remote.EndpointWriter - Transient association error (association
remains live)
java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
CheckpointCoordinator
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
~[akka-remote_2.10-2.3.7.jar:?]
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
~[akka-remote_2.10-2.3.7.jar:?]

Any reason why its failing? This code works when invoked through
WebRuntimeMonitor.

Regards
Sumit Chawla

Reply via email to