[FLINK-4526][yarn] remove redundant proxy messages This closes #2437
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ec1efb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ec1efb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ec1efb Branch: refs/heads/release-1.1 Commit: f7ec1efb32c277a0d51479cca4450edb324a3d58 Parents: 4cdeb11 Author: Maximilian Michels <m...@apache.org> Authored: Tue Aug 30 10:46:22 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Aug 29 18:20:30 2016 +0200 ---------------------------------------------------------------------- .../flink/client/program/ClusterClient.java | 3 +- .../apache/flink/yarn/YarnClusterClient.java | 41 +++++++------------- .../apache/flink/yarn/ApplicationClient.scala | 27 ------------- .../org/apache/flink/yarn/YarnMessages.scala | 7 ---- 4 files changed, 15 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index b8d3400..2a0aca3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -636,8 +636,7 @@ public abstract class ClusterClient { * @throws Exception */ public ActorGateway getJobManagerGateway() throws Exception { - LOG.info("Looking up JobManager"); - + LOG.debug("Looking up JobManager"); return LeaderRetrievalUtils.retrieveLeaderGateway( LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), actorSystemLoader.get(), http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index 75bfeed..8b6cd9a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -19,8 +19,6 @@ package org.apache.flink.yarn; import akka.actor.ActorRef; -import static akka.pattern.Patterns.ask; - import akka.actor.Props; import akka.pattern.Patterns; import akka.util.Timeout; @@ -30,8 +28,10 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -48,9 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.None$; import scala.Option; -import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -83,7 +81,6 @@ public class YarnClusterClient extends ClusterClient { private final AbstractYarnClusterDescriptor clusterDescriptor; private final LazApplicationClientLoader applicationClient; private final FiniteDuration akkaDuration; - private final Timeout akkaTimeout; private final ApplicationReport appReport; private final ApplicationId appId; private final String trackingURL; @@ -116,7 +113,6 @@ public class YarnClusterClient extends ClusterClient { super(flinkConfig); this.akkaDuration = AkkaUtils.getTimeout(flinkConfig); - this.akkaTimeout = Timeout.durationToTimeout(akkaDuration); this.clusterDescriptor = clusterDescriptor; this.yarnClient = yarnClient; this.hadoopConfig = yarnClient.getConfig(); @@ -175,12 +171,12 @@ public class YarnClusterClient extends ClusterClient { */ private void stopAfterJob(JobID jobID) { Preconditions.checkNotNull(jobID, "The job id must not be null"); - Future<Object> messageReceived = - ask( - applicationClient.get(), - new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout); try { - Await.result(messageReceived, akkaDuration); + Future<Object> replyFuture = + getJobManagerGateway().ask( + new ShutdownClusterAfterJob(jobID), + akkaDuration); + Await.ready(replyFuture, akkaDuration); } catch (Exception e) { throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e); } @@ -230,30 +226,21 @@ public class YarnClusterClient extends ClusterClient { @Override public GetClusterStatusResponse getClusterStatus() { if(!isConnected) { - throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); + throw new IllegalStateException("The cluster is not connected to the cluster."); } if(hasBeenShutdown()) { - return null; + throw new IllegalStateException("The cluster has already been shutdown."); } - Future<Object> clusterStatusOption = - ask( - applicationClient.get(), - YarnMessages.getLocalGetyarnClusterStatus(), - akkaTimeout); - Object clusterStatus; try { - clusterStatus = Await.result(clusterStatusOption, akkaDuration); + final Future<Object> clusterStatusOption = + getJobManagerGateway().ask( + GetClusterStatus.getInstance(), + akkaDuration); + return (GetClusterStatusResponse) Await.result(clusterStatusOption, akkaDuration); } catch (Exception e) { throw new RuntimeException("Unable to get ClusterClient status from Application Client", e); } - if(clusterStatus instanceof None$) { - throw new RuntimeException("Unable to get ClusterClient status from Application Client"); - } else if(clusterStatus instanceof Some) { - return (GetClusterStatusResponse) (((Some) clusterStatus).get()); - } else { - throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName()); - } } public ApplicationStatus getApplicationStatus() { http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index e701269..7442503 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -48,7 +48,6 @@ class ApplicationClient( with LeaderSessionMessageFilter with LogMessages with LeaderRetrievalListener{ - import context._ val log = Logger(getClass) @@ -60,7 +59,6 @@ class ApplicationClient( var pollingTimer: Option[Cancellable] = None var running = false var messagesQueue : mutable.Queue[InfoMessage] = mutable.Queue[InfoMessage]() - var latestClusterStatus : Option[GetClusterStatusResponse] = None var stopMessageReceiver : Option[ActorRef] = None var leaderSessionID: Option[UUID] = None @@ -136,19 +134,8 @@ class ApplicationClient( // The job manager acts as a proxy between the client and the resource managert val jm = sender() log.info(s"Successfully registered at the ResourceManager using JobManager $jm") - yarnJobManager = Some(jm) - // schedule a periodic status report from the JobManager - // request the number of task managers and slots from the job manager - pollingTimer = Some( - context.system.scheduler.schedule( - INITIAL_POLLING_DELAY, - WAIT_FOR_YARN_INTERVAL, - yarnJobManager.get, - decorateMessage(GetClusterStatus.getInstance())) - ) - case JobManagerLeaderAddress(jobManagerAkkaURL, newLeaderSessionID) => log.info(s"Received address of new leader $jobManagerAkkaURL with session ID" + s" $newLeaderSessionID.") @@ -192,20 +179,6 @@ class ApplicationClient( }(context.dispatcher) } - // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr - case status: GetClusterStatusResponse => - latestClusterStatus = Some(status) - - // locally get cluster status - case LocalGetYarnClusterStatus => - sender() ! decorateMessage(latestClusterStatus) - - // Forward message to Application Master - case LocalStopAMAfterJob(jobID) => - yarnJobManager foreach { - _ forward decorateMessage(new ShutdownClusterAfterJob(jobID)) - } - // ----------------- handle messages from the cluster ------------------- // receive remote messages case msg: InfoMessage => http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala index da1917b..ada2631 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala @@ -86,17 +86,10 @@ object YarnMessages { case object HeartbeatWithYarn case object CheckForUserCommand - // tell the AM to monitor the job and stop once it has finished - case class LocalStopAMAfterJob(jobId:JobID) - case object LocalGetYarnMessage // request new message - case object LocalGetYarnClusterStatus // request the latest cluster status def getLocalGetYarnMessage(): AnyRef = { LocalGetYarnMessage } - def getLocalGetyarnClusterStatus(): AnyRef = { - LocalGetYarnClusterStatus - } }