[FLINK-7863] Generalize MetricFetcher to work with a RestfulGateway Add more logging to MetricFetcher
This closes #4852. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f6f3090 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f6f3090 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f6f3090 Branch: refs/heads/master Commit: 9f6f30905fcae3ad415f4c37203fb2a94c793334 Parents: 1809cad Author: Till Rohrmann <[email protected]> Authored: Wed Oct 18 13:50:06 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 1 15:48:00 2017 +0100 ---------------------------------------------------------------------- .../runtime/akka/AkkaJobManagerGateway.java | 31 ++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 33 ++++++++ .../runtime/jobmaster/JobManagerRunner.java | 4 + .../flink/runtime/metrics/MetricRegistry.java | 23 ++++++ .../resourcemanager/ResourceManager.java | 21 +++++ .../resourcemanager/ResourceManagerGateway.java | 11 +++ .../handler/legacy/metrics/MetricFetcher.java | 83 ++++++++++++-------- .../flink/runtime/rpc/akka/AkkaRpcService.java | 4 + .../runtime/webmonitor/RestfulGateway.java | 20 +++++ .../retriever/MetricQueryServiceGateway.java | 2 + .../retriever/impl/AkkaQueryServiceGateway.java | 5 ++ .../legacy/metrics/MetricFetcherTest.java | 23 +++--- 12 files changed, 212 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java index 0a2d4d6..6896852 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.akka; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.ActorGateway; @@ -36,14 +37,17 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview; import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import java.util.Collection; +import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; import scala.Option; import scala.reflect.ClassTag$; @@ -252,6 +256,33 @@ public class AkkaJobManagerGateway implements JobManagerGateway { } @Override + public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) { + final String jobManagerPath = getAddress(); + final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; + + return CompletableFuture.completedFuture( + Collections.singleton(jobManagerMetricQueryServicePath)); + } + + @Override + public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + return requestTaskManagerInstances(timeout) + .thenApply( + (Collection<Instance> instances) -> + instances + .stream() + .map( + (Instance instance) -> { + final String taskManagerAddress = instance.getTaskManagerGateway().getAddress(); + final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString(); + + return Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath); + }) + .collect(Collectors.toList())); + } + + @Override public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) { return FutureUtils.toJava( jobManagerGateway http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index efaebb1..dda0275 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobSubmissionException; @@ -30,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -51,13 +53,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import akka.actor.ActorSystem; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -157,6 +163,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } try { + metricRegistry.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { super.postStop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); @@ -171,6 +183,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme public void start() throws Exception { super.start(); + // start the MetricQueryService + // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint + final ActorSystem actorSystem = ((AkkaRpcService) getRpcService()).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); + leaderElectionService.start(this); } @@ -356,6 +373,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override + public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) { + final String metricQueryServicePath = metricRegistry.getMetricQueryServicePath(); + + if (metricQueryServicePath != null) { + return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath)); + } else { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + } + + @Override + public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + return resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout); + } + + @Override public CompletableFuture<Integer> getBlobServerPort(Time timeout) { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 70abf2f..14baa6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -178,6 +178,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F } } + //---------------------------------------------------------------------------------------------- + // Getter + //---------------------------------------------------------------------------------------------- + public JobMasterGateway getJobManagerGateway() { return jobManager.getSelfGateway(JobMasterGateway.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 2e07370..278292d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.View; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; @@ -43,6 +44,8 @@ import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.List; import java.util.TimerTask; @@ -65,8 +68,13 @@ public class MetricRegistry { private List<MetricReporter> reporters; private ScheduledExecutorService executor; + + @Nullable private ActorRef queryService; + @Nullable + private String metricQueryServicePath; + private ViewUpdater viewUpdater; private final ScopeFormats scopeFormats; @@ -87,6 +95,9 @@ public class MetricRegistry { this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry")); + this.queryService = null; + this.metricQueryServicePath = null; + if (reporterConfigurations.isEmpty()) { // no reporters defined // by default, don't report anything @@ -165,6 +176,7 @@ public class MetricRegistry { try { queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); + metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService); } catch (Exception e) { LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); } @@ -172,6 +184,16 @@ public class MetricRegistry { } /** + * Returns the address under which the {@link MetricQueryService} is reachable. + * + * @return address of the metric query service + */ + @Nullable + public String getMetricQueryServicePath() { + return metricQueryServicePath; + } + + /** * Returns the global delimiter. * * @return global delimiter @@ -368,6 +390,7 @@ public class MetricRegistry { // ------------------------------------------------------------------------ @VisibleForTesting + @Nullable public ActorRef getQueryService() { return queryService; } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 38cfd6e..1d4d4f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration; @@ -59,6 +61,8 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -493,6 +497,23 @@ public abstract class ResourceManager<WorkerType extends Serializable> numberFreeSlots)); } + @Override + public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + final ArrayList<Tuple2<InstanceID, String>> metricQueryServicePaths = new ArrayList<>(taskExecutors.size()); + + for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> workerRegistrationEntry : taskExecutors.entrySet()) { + final ResourceID tmResourceId = workerRegistrationEntry.getKey(); + final WorkerRegistration<WorkerType> workerRegistration = workerRegistrationEntry.getValue(); + final String taskManagerAddress = workerRegistration.getTaskExecutorGateway().getAddress(); + final String tmMetricQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + tmResourceId.getResourceIdString(); + + metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), tmMetricQueryServicePath)); + } + + return CompletableFuture.completedFuture(metricQueryServicePaths); + } + // ------------------------------------------------------------------------ // Internal methods // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index e0674b6..9eacb4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -27,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -34,6 +36,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -166,4 +169,12 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager * @return Future containing the resource overview */ CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout Time timeout); + + /** + * Requests the paths for the TaskManager's {@link MetricQueryService} to query. + * + * @param timeout for the asynchronous operation + * @return Future containing the collection of instance ids and the corresponding metric query service path + */ + CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index fa71c68..1bfb9f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; -import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; @@ -49,10 +49,10 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since * the last call has passed. */ -public class MetricFetcher { +public class MetricFetcher<T extends RestfulGateway> { private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); - private final GatewayRetriever<JobManagerGateway> retriever; + private final GatewayRetriever<T> retriever; private final MetricQueryServiceRetriever queryServiceRetriever; private final Executor executor; private final Time timeout; @@ -63,7 +63,7 @@ public class MetricFetcher { private long lastUpdateTime; public MetricFetcher( - GatewayRetriever<JobManagerGateway> retriever, + GatewayRetriever<T> retriever, MetricQueryServiceRetriever queryServiceRetriever, Executor executor, Time timeout) { @@ -96,15 +96,20 @@ public class MetricFetcher { } private void fetchMetrics() { + LOG.debug("Start fetching metrics."); + try { - Optional<JobManagerGateway> optJobManagerGateway = retriever.getNow(); - if (optJobManagerGateway.isPresent()) { - final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); + Optional<T> optionalLeaderGateway = retriever.getNow(); + if (optionalLeaderGateway.isPresent()) { + final T leaderGateway = optionalLeaderGateway.get(); /** * Remove all metrics that belong to a job that is not running and no longer archived. */ - CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails( + true, + true, + timeout); jobDetailsFuture.whenCompleteAsync( (MultipleJobsDetails jobDetails, Throwable throwable) -> { @@ -123,35 +128,41 @@ public class MetricFetcher { }, executor); - String jobManagerPath = jobManagerGateway.getAddress(); - String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - - retrieveAndQueryMetrics(jmQueryServicePath); + CompletableFuture<Collection<String>> queryServicePathsFuture = leaderGateway.requestMetricQueryServicePaths(timeout); - /** - * We first request the list of all registered task managers from the job manager, and then - * request the respective metric dump from each task manager. - * - * <p>All stored metrics that do not belong to a registered task manager will be removed. - */ - CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); - - taskManagersFuture.whenCompleteAsync( - (Collection<Instance> taskManagers, Throwable throwable) -> { + queryServicePathsFuture.whenCompleteAsync( + (Collection<String> queryServicePaths, Throwable throwable) -> { if (throwable != null) { - LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + LOG.warn("Requesting paths for query services failed.", throwable); } else { - List<String> activeTaskManagers = taskManagers.stream().map( - taskManagerInstance -> { - final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); - final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); - - retrieveAndQueryMetrics(tmQueryServicePath); + for (String queryServicePath : queryServicePaths) { + retrieveAndQueryMetrics(queryServicePath); + } + } + }, + executor); - return taskManagerInstance.getId().toString(); - }).collect(Collectors.toList()); + // TODO: Once the old code has been ditched, remove the explicit TaskManager query service discovery + // TODO: and return it as part of requestQueryServicePaths. Moreover, change the MetricStore such that + // TODO: we don't have to explicitly retain the valid TaskManagers, e.g. letting it be a cache with expiry time + CompletableFuture<Collection<Tuple2<InstanceID, String>>> taskManagerQueryServicePathsFuture = leaderGateway + .requestTaskManagerMetricQueryServicePaths(timeout); - metrics.retainTaskManagers(activeTaskManagers); + taskManagerQueryServicePathsFuture.whenCompleteAsync( + (Collection<Tuple2<InstanceID, String>> queryServicePaths, Throwable throwable) -> { + if (throwable != null) { + LOG.warn("Requesting TaskManager's path for query services failed.", throwable); + } else { + List<String> taskManagersToRetain = queryServicePaths + .stream() + .map( + (Tuple2<InstanceID, String> tuple) -> { + retrieveAndQueryMetrics(tuple.f1); + return tuple.f0.toString(); + } + ).collect(Collectors.toList()); + + metrics.retainTaskManagers(taskManagersToRetain); } }, executor); @@ -167,6 +178,8 @@ public class MetricFetcher { * @param queryServicePath specifying the QueryServiceGateway */ private void retrieveAndQueryMetrics(String queryServicePath) { + LOG.debug("Retrieve metric query service gateway for {}", queryServicePath); + final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); queryServiceGatewayFuture.whenCompleteAsync( @@ -186,6 +199,8 @@ public class MetricFetcher { * @param queryServiceGateway to query for metrics */ private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { + LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress()); + queryServiceGateway .queryMetrics(timeout) .whenCompleteAsync( http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 4ad7e70..68b5aaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -133,6 +133,10 @@ public class AkkaRpcService implements RpcService { stopped = false; } + public ActorSystem getActorSystem() { + return actorSystem; + } + @Override public String getAddress() { return address; http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index b2fc026..d871b06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -20,14 +20,18 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -76,4 +80,20 @@ public interface RestfulGateway extends RpcGateway { * @return Future containing the status overview */ CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout); + + /** + * Requests the paths for the {@link MetricQueryService} to query. + * + * @param timeout for the asynchronous operation + * @return Future containing the collection of metric query service paths to query + */ + CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** + * Requests the paths for the TaskManager's {@link MetricQueryService} to query. + * + * @param timeout for the asynchronous operation + * @return Future containing the collection of instance ids and the corresponding metric query service path + */ + CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java index c79bf5d..8368993 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java @@ -33,4 +33,6 @@ import java.util.concurrent.CompletableFuture; public interface MetricQueryServiceGateway { CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout); + + String getAddress(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java index 8985205..c53dce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java @@ -50,4 +50,9 @@ public class AkkaQueryServiceGateway implements MetricQueryServiceGateway { .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class)) ); } + + @Override + public String getAddress() { + return queryServiceActorRef.path().toString(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f6f3090/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index a6eaf2f..ce98f31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -28,9 +28,7 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -76,25 +74,22 @@ public class MetricFetcherTest extends TestLogger { JobID jobID = new JobID(); InstanceID tmID = new InstanceID(); ResourceID tmRID = new ResourceID(tmID.toString()); - TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); - when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); - - Instance taskManager = mock(Instance.class); - when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); - when(taskManager.getId()).thenReturn(tmID); - when(taskManager.getTaskManagerID()).thenReturn(tmRID); // ========= setup JobManager ================================================================================== JobDetails details = mock(JobDetails.class); when(details.getJobId()).thenReturn(jobID); + final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME; + final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString(); + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class))) .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList()))); - when(jobManagerGateway.requestTaskManagerInstances(any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager))); - when(jobManagerGateway.getAddress()).thenReturn("/jm/address"); + when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn( + CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath))); + when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn( + CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, tmMetricQueryServicePath)))); GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class); when(retriever.getNow()) @@ -112,8 +107,8 @@ public class MetricFetcherTest extends TestLogger { .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer)); MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class); - when(queryServiceRetriever.retrieveService(eq("/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME))).thenReturn(CompletableFuture.completedFuture(jmQueryService)); - when(queryServiceRetriever.retrieveService(eq("/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString()))).thenReturn(CompletableFuture.completedFuture(tmQueryService)); + when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService)); + when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService)); // ========= start MetricFetcher testing ======================================================================= MetricFetcher fetcher = new MetricFetcher(
