[FLINK-4526][yarn] remove redundant proxy messages

This closes #2437


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ec1efb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ec1efb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ec1efb

Branch: refs/heads/release-1.1
Commit: f7ec1efb32c277a0d51479cca4450edb324a3d58
Parents: 4cdeb11
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Aug 30 10:46:22 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Aug 29 18:20:30 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  3 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 41 +++++++-------------
 .../apache/flink/yarn/ApplicationClient.scala   | 27 -------------
 .../org/apache/flink/yarn/YarnMessages.scala    |  7 ----
 4 files changed, 15 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index b8d3400..2a0aca3 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -636,8 +636,7 @@ public abstract class ClusterClient {
         * @throws Exception
         */
        public ActorGateway getJobManagerGateway() throws Exception {
-               LOG.info("Looking up JobManager");
-
+               LOG.debug("Looking up JobManager");
                return LeaderRetrievalUtils.retrieveLeaderGateway(
                        
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
                        actorSystemLoader.get(),

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 75bfeed..8b6cd9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,8 +19,6 @@ package org.apache.flink.yarn;
 
 import akka.actor.ActorRef;
 
-import static akka.pattern.Patterns.ask;
-
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -30,8 +28,10 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -48,9 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.None$;
 import scala.Option;
-import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -83,7 +81,6 @@ public class YarnClusterClient extends ClusterClient {
        private final AbstractYarnClusterDescriptor clusterDescriptor;
        private final LazApplicationClientLoader applicationClient;
        private final FiniteDuration akkaDuration;
-       private final Timeout akkaTimeout;
        private final ApplicationReport appReport;
        private final ApplicationId appId;
        private final String trackingURL;
@@ -116,7 +113,6 @@ public class YarnClusterClient extends ClusterClient {
                super(flinkConfig);
 
                this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
-               this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
                this.clusterDescriptor = clusterDescriptor;
                this.yarnClient = yarnClient;
                this.hadoopConfig = yarnClient.getConfig();
@@ -175,12 +171,12 @@ public class YarnClusterClient extends ClusterClient {
         */
        private void stopAfterJob(JobID jobID) {
                Preconditions.checkNotNull(jobID, "The job id must not be 
null");
-               Future<Object> messageReceived =
-                       ask(
-                               applicationClient.get(),
-                               new YarnMessages.LocalStopAMAfterJob(jobID), 
akkaTimeout);
                try {
-                       Await.result(messageReceived, akkaDuration);
+                       Future<Object> replyFuture =
+                               getJobManagerGateway().ask(
+                                       new ShutdownClusterAfterJob(jobID),
+                                       akkaDuration);
+                       Await.ready(replyFuture, akkaDuration);
                } catch (Exception e) {
                        throw new RuntimeException("Unable to tell application 
master to stop once the specified job has been finised", e);
                }
@@ -230,30 +226,21 @@ public class YarnClusterClient extends ClusterClient {
        @Override
        public GetClusterStatusResponse getClusterStatus() {
                if(!isConnected) {
-                       throw new IllegalStateException("The cluster is not 
connected to the ApplicationMaster.");
+                       throw new IllegalStateException("The cluster is not 
connected to the cluster.");
                }
                if(hasBeenShutdown()) {
-                       return null;
+                       throw new IllegalStateException("The cluster has 
already been shutdown.");
                }
 
-               Future<Object> clusterStatusOption =
-                       ask(
-                               applicationClient.get(),
-                               YarnMessages.getLocalGetyarnClusterStatus(),
-                               akkaTimeout);
-               Object clusterStatus;
                try {
-                       clusterStatus = Await.result(clusterStatusOption, 
akkaDuration);
+                       final Future<Object> clusterStatusOption =
+                               getJobManagerGateway().ask(
+                                       GetClusterStatus.getInstance(),
+                                       akkaDuration);
+                       return (GetClusterStatusResponse) 
Await.result(clusterStatusOption, akkaDuration);
                } catch (Exception e) {
                        throw new RuntimeException("Unable to get ClusterClient 
status from Application Client", e);
                }
-               if(clusterStatus instanceof None$) {
-                       throw new RuntimeException("Unable to get ClusterClient 
status from Application Client");
-               } else if(clusterStatus instanceof Some) {
-                       return (GetClusterStatusResponse) (((Some) 
clusterStatus).get());
-               } else {
-                       throw new RuntimeException("Unexpected type: " + 
clusterStatus.getClass().getCanonicalName());
-               }
        }
 
        public ApplicationStatus getApplicationStatus() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index e701269..7442503 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -48,7 +48,6 @@ class ApplicationClient(
   with LeaderSessionMessageFilter
   with LogMessages
   with LeaderRetrievalListener{
-  import context._
 
   val log = Logger(getClass)
 
@@ -60,7 +59,6 @@ class ApplicationClient(
   var pollingTimer: Option[Cancellable] = None
   var running = false
   var messagesQueue : mutable.Queue[InfoMessage] = mutable.Queue[InfoMessage]()
-  var latestClusterStatus : Option[GetClusterStatusResponse] = None
   var stopMessageReceiver : Option[ActorRef] = None
 
   var leaderSessionID: Option[UUID] = None
@@ -136,19 +134,8 @@ class ApplicationClient(
       // The job manager acts as a proxy between the client and the resource 
managert
       val jm = sender()
       log.info(s"Successfully registered at the ResourceManager using 
JobManager $jm")
-
       yarnJobManager = Some(jm)
 
-      // schedule a periodic status report from the JobManager
-      // request the number of task managers and slots from the job manager
-      pollingTimer = Some(
-        context.system.scheduler.schedule(
-          INITIAL_POLLING_DELAY,
-          WAIT_FOR_YARN_INTERVAL,
-          yarnJobManager.get,
-          decorateMessage(GetClusterStatus.getInstance()))
-      )
-
     case JobManagerLeaderAddress(jobManagerAkkaURL, newLeaderSessionID) =>
       log.info(s"Received address of new leader $jobManagerAkkaURL with 
session ID" +
         s" $newLeaderSessionID.")
@@ -192,20 +179,6 @@ class ApplicationClient(
           }(context.dispatcher)
       }
 
-    // handle the responses from the PollYarnClusterStatus messages to the 
yarn job mgr
-    case status: GetClusterStatusResponse =>
-      latestClusterStatus = Some(status)
-
-    // locally get cluster status
-    case LocalGetYarnClusterStatus =>
-      sender() ! decorateMessage(latestClusterStatus)
-
-    // Forward message to Application Master
-    case LocalStopAMAfterJob(jobID) =>
-      yarnJobManager foreach {
-        _ forward decorateMessage(new ShutdownClusterAfterJob(jobID))
-      }
-
     // -----------------  handle messages from the cluster -------------------
     // receive remote messages
     case msg: InfoMessage =>

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ec1efb/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index da1917b..ada2631 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -86,17 +86,10 @@ object YarnMessages {
   case object HeartbeatWithYarn
   case object CheckForUserCommand
 
-  // tell the AM to monitor the job and stop once it has finished
-  case class LocalStopAMAfterJob(jobId:JobID)
-
   case object LocalGetYarnMessage // request new message
-  case object LocalGetYarnClusterStatus // request the latest cluster status
 
   def getLocalGetYarnMessage(): AnyRef = {
     LocalGetYarnMessage
   }
 
-  def getLocalGetyarnClusterStatus(): AnyRef = {
-    LocalGetYarnClusterStatus
-  }
 }

Reply via email to