[FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway

Add more logging to MetricFetcher

This closes #4852.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6f3090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6f3090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6f3090

Branch: refs/heads/master
Commit: 9f6f30905fcae3ad415f4c37203fb2a94c793334
Parents: 1809cad
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 18 13:50:06 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 1 15:48:00 2017 +0100

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     | 31 ++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 33 ++++++++
 .../runtime/jobmaster/JobManagerRunner.java     |  4 +
 .../flink/runtime/metrics/MetricRegistry.java   | 23 ++++++
 .../resourcemanager/ResourceManager.java        | 21 +++++
 .../resourcemanager/ResourceManagerGateway.java | 11 +++
 .../handler/legacy/metrics/MetricFetcher.java   | 83 ++++++++++++--------
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  4 +
 .../runtime/webmonitor/RestfulGateway.java      | 20 +++++
 .../retriever/MetricQueryServiceGateway.java    |  2 +
 .../retriever/impl/AkkaQueryServiceGateway.java |  5 ++
 .../legacy/metrics/MetricFetcherTest.java       | 23 +++---
 12 files changed, 212 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 0a2d4d6..6896852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.akka;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -36,14 +37,17 @@ import 
org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
 
 import scala.Option;
 import scala.reflect.ClassTag$;
@@ -252,6 +256,33 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        }
 
        @Override
+       public CompletableFuture<Collection<String>> 
requestMetricQueryServicePaths(Time timeout) {
+               final String jobManagerPath = getAddress();
+               final String jobManagerMetricQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+
+               return CompletableFuture.completedFuture(
+                       
Collections.singleton(jobManagerMetricQueryServicePath));
+       }
+
+       @Override
+       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               return requestTaskManagerInstances(timeout)
+                       .thenApply(
+                               (Collection<Instance> instances) ->
+                                       instances
+                                               .stream()
+                                               .map(
+                                                       (Instance instance) -> {
+                                                               final String 
taskManagerAddress = instance.getTaskManagerGateway().getAddress();
+                                                               final String 
taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) +
+                                                                       
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + 
instance.getTaskManagerID().getResourceIdString();
+
+                                                               return 
Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath);
+                                                       })
+                                               .collect(Collectors.toList()));
+       }
+
+       @Override
        public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time 
timeout) {
                return FutureUtils.toJava(
                        jobManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index efaebb1..dda0275 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -30,6 +31,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -51,13 +53,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -157,6 +163,12 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
 
                try {
+                       metricRegistry.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
                        super.postStop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
@@ -171,6 +183,11 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        public void start() throws Exception {
                super.start();
 
+               // start the MetricQueryService
+               // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
+               final ActorSystem actorSystem = ((AkkaRpcService) 
getRpcService()).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, null);
+
                leaderElectionService.start(this);
        }
 
@@ -356,6 +373,22 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
+       public CompletableFuture<Collection<String>> 
requestMetricQueryServicePaths(Time timeout) {
+               final String metricQueryServicePath = 
metricRegistry.getMetricQueryServicePath();
+
+               if (metricQueryServicePath != null) {
+                       return 
CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+               } else {
+                       return 
CompletableFuture.completedFuture(Collections.emptyList());
+               }
+       }
+
+       @Override
+       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               return 
resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
+       }
+
+       @Override
        public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
                return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 70abf2f..14baa6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -178,6 +178,10 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                }
        }
 
+       
//----------------------------------------------------------------------------------------------
+       // Getter
+       
//----------------------------------------------------------------------------------------------
+
        public JobMasterGateway getJobManagerGateway() {
                return jobManager.getSelfGateway(JobMasterGateway.class);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 2e07370..278292d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.View;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -43,6 +44,8 @@ import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TimerTask;
@@ -65,8 +68,13 @@ public class MetricRegistry {
 
        private List<MetricReporter> reporters;
        private ScheduledExecutorService executor;
+
+       @Nullable
        private ActorRef queryService;
 
+       @Nullable
+       private String metricQueryServicePath;
+
        private ViewUpdater viewUpdater;
 
        private final ScopeFormats scopeFormats;
@@ -87,6 +95,9 @@ public class MetricRegistry {
 
                this.executor = Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-MetricRegistry"));
 
+               this.queryService = null;
+               this.metricQueryServicePath = null;
+
                if (reporterConfigurations.isEmpty()) {
                        // no reporters defined
                        // by default, don't report anything
@@ -165,6 +176,7 @@ public class MetricRegistry {
 
                        try {
                                queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+                               metricQueryServicePath = 
AkkaUtils.getAkkaURL(actorSystem, queryService);
                        } catch (Exception e) {
                                LOG.warn("Could not start MetricDumpActor. No 
metrics will be submitted to the WebInterface.", e);
                        }
@@ -172,6 +184,16 @@ public class MetricRegistry {
        }
 
        /**
+        * Returns the address under which the {@link MetricQueryService} is 
reachable.
+        *
+        * @return address of the metric query service
+        */
+       @Nullable
+       public String getMetricQueryServicePath() {
+               return metricQueryServicePath;
+       }
+
+       /**
         * Returns the global delimiter.
         *
         * @return global delimiter
@@ -368,6 +390,7 @@ public class MetricRegistry {
        // 
------------------------------------------------------------------------
 
        @VisibleForTesting
+       @Nullable
        public ActorRef getQueryService() {
                return queryService;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 38cfd6e..1d4d4f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -42,6 +43,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import 
org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
@@ -59,6 +61,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -493,6 +497,23 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                numberFreeSlots));
        }
 
+       @Override
+       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               final ArrayList<Tuple2<InstanceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+
+               for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> 
workerRegistrationEntry : taskExecutors.entrySet()) {
+                       final ResourceID tmResourceId = 
workerRegistrationEntry.getKey();
+                       final WorkerRegistration<WorkerType> workerRegistration 
= workerRegistrationEntry.getValue();
+                       final String taskManagerAddress = 
workerRegistration.getTaskExecutorGateway().getAddress();
+                       final String tmMetricQueryServicePath = 
taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+                               MetricQueryService.METRIC_QUERY_SERVICE_NAME + 
'_' + tmResourceId.getResourceIdString();
+
+                       
metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), 
tmMetricQueryServicePath));
+               }
+
+               return 
CompletableFuture.completedFuture(metricQueryServicePaths);
+       }
+
        // 
------------------------------------------------------------------------
        //  Internal methods
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index e0674b6..9eacb4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -34,6 +36,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -166,4 +169,12 @@ public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManager
         * @return Future containing the resource overview
         */
        CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout 
Time timeout);
+
+       /**
+        * Requests the paths for the TaskManager's {@link MetricQueryService} 
to query.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the collection of instance ids and the 
corresponding metric query service path
+        */
+       CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index fa71c68..1bfb9f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -49,10 +49,10 @@ import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
  * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
  * the last call has passed.
  */
-public class MetricFetcher {
+public class MetricFetcher<T extends RestfulGateway> {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
 
-       private final GatewayRetriever<JobManagerGateway> retriever;
+       private final GatewayRetriever<T> retriever;
        private final MetricQueryServiceRetriever queryServiceRetriever;
        private final Executor executor;
        private final Time timeout;
@@ -63,7 +63,7 @@ public class MetricFetcher {
        private long lastUpdateTime;
 
        public MetricFetcher(
-                       GatewayRetriever<JobManagerGateway> retriever,
+                       GatewayRetriever<T> retriever,
                        MetricQueryServiceRetriever queryServiceRetriever,
                        Executor executor,
                        Time timeout) {
@@ -96,15 +96,20 @@ public class MetricFetcher {
        }
 
        private void fetchMetrics() {
+               LOG.debug("Start fetching metrics.");
+
                try {
-                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getNow();
-                       if (optJobManagerGateway.isPresent()) {
-                               final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
+                       Optional<T> optionalLeaderGateway = retriever.getNow();
+                       if (optionalLeaderGateway.isPresent()) {
+                               final T leaderGateway = 
optionalLeaderGateway.get();
 
                                /**
                                 * Remove all metrics that belong to a job that 
is not running and no longer archived.
                                 */
-                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = leaderGateway.requestJobDetails(
+                                       true,
+                                       true,
+                                       timeout);
 
                                jobDetailsFuture.whenCompleteAsync(
                                        (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
@@ -123,35 +128,41 @@ public class MetricFetcher {
                                        },
                                        executor);
 
-                               String jobManagerPath = 
jobManagerGateway.getAddress();
-                               String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-
-                               retrieveAndQueryMetrics(jmQueryServicePath);
+                               CompletableFuture<Collection<String>> 
queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout);
 
-                               /**
-                                * We first request the list of all registered 
task managers from the job manager, and then
-                                * request the respective metric dump from each 
task manager.
-                                *
-                                * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
-                                */
-                               CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-                               taskManagersFuture.whenCompleteAsync(
-                                       (Collection<Instance> taskManagers, 
Throwable throwable) -> {
+                               queryServicePathsFuture.whenCompleteAsync(
+                                       (Collection<String> queryServicePaths, 
Throwable throwable) -> {
                                                if (throwable != null) {
-                                                       LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
+                                                       LOG.warn("Requesting 
paths for query services failed.", throwable);
                                                } else {
-                                                       List<String> 
activeTaskManagers = taskManagers.stream().map(
-                                                               
taskManagerInstance -> {
-                                                                       final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
-                                                                       final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
-
-                                                                       
retrieveAndQueryMetrics(tmQueryServicePath);
+                                                       for (String 
queryServicePath : queryServicePaths) {
+                                                               
retrieveAndQueryMetrics(queryServicePath);
+                                                       }
+                                               }
+                                       },
+                                       executor);
 
-                                                                       return 
taskManagerInstance.getId().toString();
-                                                               
}).collect(Collectors.toList());
+                               // TODO: Once the old code has been ditched, 
remove the explicit TaskManager query service discovery
+                               // TODO: and return it as part of 
requestQueryServicePaths. Moreover, change the MetricStore such that
+                               // TODO: we don't have to explicitly retain the 
valid TaskManagers, e.g. letting it be a cache with expiry time
+                               CompletableFuture<Collection<Tuple2<InstanceID, 
String>>> taskManagerQueryServicePathsFuture = leaderGateway
+                                       
.requestTaskManagerMetricQueryServicePaths(timeout);
 
-                                                       
metrics.retainTaskManagers(activeTaskManagers);
+                               
taskManagerQueryServicePathsFuture.whenCompleteAsync(
+                                       (Collection<Tuple2<InstanceID, String>> 
queryServicePaths, Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.warn("Requesting 
TaskManager's path for query services failed.", throwable);
+                                               } else {
+                                                       List<String> 
taskManagersToRetain = queryServicePaths
+                                                               .stream()
+                                                               .map(
+                                                                       
(Tuple2<InstanceID, String> tuple) -> {
+                                                                               
retrieveAndQueryMetrics(tuple.f1);
+                                                                               
return tuple.f0.toString();
+                                                                       }
+                                                               
).collect(Collectors.toList());
+
+                                                       
metrics.retainTaskManagers(taskManagersToRetain);
                                                }
                                        },
                                        executor);
@@ -167,6 +178,8 @@ public class MetricFetcher {
         * @param queryServicePath specifying the QueryServiceGateway
         */
        private void retrieveAndQueryMetrics(String queryServicePath) {
+               LOG.debug("Retrieve metric query service gateway for {}", 
queryServicePath);
+
                final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
 
                queryServiceGatewayFuture.whenCompleteAsync(
@@ -186,6 +199,8 @@ public class MetricFetcher {
         * @param queryServiceGateway to query for metrics
         */
        private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
+               LOG.debug("Query metrics for {}.", 
queryServiceGateway.getAddress());
+
                queryServiceGateway
                        .queryMetrics(timeout)
                        .whenCompleteAsync(

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4ad7e70..68b5aaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -133,6 +133,10 @@ public class AkkaRpcService implements RpcService {
                stopped = false;
        }
 
+       public ActorSystem getActorSystem() {
+               return actorSystem;
+       }
+
        @Override
        public String getAddress() {
                return address;

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index b2fc026..d871b06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -76,4 +80,20 @@ public interface RestfulGateway extends RpcGateway {
         * @return Future containing the status overview
         */
        CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout 
Time timeout);
+
+       /**
+        * Requests the paths for the {@link MetricQueryService} to query.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the collection of metric query service 
paths to query
+        */
+       CompletableFuture<Collection<String>> 
requestMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+       /**
+        * Requests the paths for the TaskManager's {@link MetricQueryService} 
to query.
+        *
+        * @param timeout for the asynchronous operation
+        * @return Future containing the collection of instance ids and the 
corresponding metric query service path
+        */
+       CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
index c79bf5d..8368993 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
@@ -33,4 +33,6 @@ import java.util.concurrent.CompletableFuture;
 public interface MetricQueryServiceGateway {
 
        CompletableFuture<MetricDumpSerialization.MetricSerializationResult> 
queryMetrics(Time timeout);
+
+       String getAddress();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
index 8985205..c53dce0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
@@ -50,4 +50,9 @@ public class AkkaQueryServiceGateway implements 
MetricQueryServiceGateway {
                                
.mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
                );
        }
+
+       @Override
+       public String getAddress() {
+               return queryServiceActorRef.path().toString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index a6eaf2f..ce98f31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,9 +28,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -76,25 +74,22 @@ public class MetricFetcherTest extends TestLogger {
                JobID jobID = new JobID();
                InstanceID tmID = new InstanceID();
                ResourceID tmRID = new ResourceID(tmID.toString());
-               TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
-               when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
-               Instance taskManager = mock(Instance.class);
-               
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-               when(taskManager.getId()).thenReturn(tmID);
-               when(taskManager.getTaskManagerID()).thenReturn(tmRID);
 
                // ========= setup JobManager 
==================================================================================
                JobDetails details = mock(JobDetails.class);
                when(details.getJobId()).thenReturn(jobID);
 
+               final String jmMetricQueryServicePath = "/jm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+               final String tmMetricQueryServicePath = "/tm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
tmRID.getResourceIdString();
+
                JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
 
                when(jobManagerGateway.requestJobDetails(anyBoolean(), 
anyBoolean(), any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(new 
MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
-               
when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
-                       
.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
-               when(jobManagerGateway.getAddress()).thenReturn("/jm/address");
+               
when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
+                       
CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
+               
when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
+                       
CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, 
tmMetricQueryServicePath))));
 
                GatewayRetriever<JobManagerGateway> retriever = 
mock(AkkaJobManagerRetriever.class);
                when(retriever.getNow())
@@ -112,8 +107,8 @@ public class MetricFetcherTest extends TestLogger {
                        
.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
 
                MetricQueryServiceRetriever queryServiceRetriever = 
mock(MetricQueryServiceRetriever.class);
-               when(queryServiceRetriever.retrieveService(eq("/jm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-               when(queryServiceRetriever.retrieveService(eq("/tm/" + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+               
when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
+               
when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
 
                // ========= start MetricFetcher testing 
=======================================================================
                MetricFetcher fetcher = new MetricFetcher(

Reply via email to