Hi XiangWei, it is actually not intended to get access to the ExecutionGraph, because it is a runtime component which does not make much sense to exist outside of the JobManager. The RequestJob message is only a hack to make the ExecutionGraph accessible to another actor running in the same ActorSystem. This is the case for the WebRuntimeMonitor handlers. With Flip-6, we will make the ExecutionGraph indirectly accessible by returning an ArchivedExecutionGraph.
Cheers, Till On Tue, Nov 7, 2017 at 2:47 PM, XiangWei Huang <xw.huang...@gmail.com> wrote: > hi Till, > > Sorry,I've made a mistake,i used > *StandaloneClusterClient*#*getJobManagerGateway > *to get *ActorGateway *to communicate with *JobManager *instead of using > *JobMasterGateway*. > Below is the code i executed for getting ExecuteGraph of a Job. > > > val flinkConfig = new Configuration() > val flinkCli = new StandaloneClusterClient(flinkConfig) > * val jobManagerGateWay = flinkCli.getJobManagerGateway* > val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new > FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[ > RunningJobsStatus]] > val jobsStatus = Await.result(jobs,new FiniteDuration(10,TimeUnit. > SECONDS)).getStatusMessages().asScala.head > val jobId = jobsStatus.getJobId > val timeOut = new FiniteDuration(10,TimeUnit.SECONDS) > *val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)* > val result = Await.result(future,timeOut) > > JobManager threw NotSerializableException when i executed this code. So i > wonder how is this happened and is there another way to get a job's > ExecutionGraph programmatically. > > Best,XiangWei > > 2017-11-07 17:16 GMT+08:00 Till Rohrmann <trohrm...@apache.org>: > >> Hi XiangWei, >> >> how do you use the JobMasterGateway with the actor message RequestJob? >> The JobMasterGateway is a Java interface and does not represent an >> ActorCell to which you can send actor messages. Instead you should call >> JobMasterGateway#requestArchivedExecutionGraph. >> >> Cheers, >> Till >> >> >> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi XiangWei, >>> >>> I don't think this is a public interface, but Till (in CC) might know >>> better. >>> >>> Best, >>> Fabian >>> >>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang <xw.huang...@gmail.com>: >>> >>>> Hi Flink users, >>>> Flink Jobmanager throw a NotSerializableException when i used >>>> JobMasterGateway to get ExecutionGraph of a specific job with >>>> message *RequestJob(jobID). *Blow is the detail of Exception: >>>> >>>> >>>> [ERROR] [akka.remote.EndpointWriter] - Transient association error >>>> (association remains live)java.io.NotSerializableException: >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at >>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) >>>> at >>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >>>> at >>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >>>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) >>>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) >>>> at >>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) >>>> at >>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875) >>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >>>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874) >>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769) >>>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> >>>> So,is it a bug or the way to get job’s executionGraph is invalid. >>>> >>>> >>>> Best,XiangWei >>>> >>>> >>>> >>> >> >