Repository: flink
Updated Branches:
  refs/heads/master 742e4a0ff -> 660a45ca1


[FLINK-7653] Properly implement Dispatcher#requestClusterOverview

This commit implements the ClusterOverview generation on the Dispatcher. In
order to do this, the Dispatcher requests the ResourceOverview from the
ResourceManager and the job status from all JobMasters. After receiving all
information, it is compiled into the ClusterOverview.

Note: StatusOverview has been renamed to ClusterOverview

This closes #4793.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/660a45ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/660a45ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/660a45ca

Branch: refs/heads/master
Commit: 660a45ca1a67ade234652482e8e41c79ab674d3d
Parents: 742e4a0
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 11 18:08:03 2017 +0200
Committer: Till <[email protected]>
Committed: Thu Oct 12 18:30:25 2017 +0200

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     |   6 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  67 ++++++++--
 .../dispatcher/DispatcherRestEndpoint.java      |   4 +-
 .../dispatcher/StandaloneDispatcher.java        |   3 +
 .../entrypoint/SessionClusterEntrypoint.java    |   4 +
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +
 .../runtime/jobmaster/JobMasterGateway.java     |   9 ++
 .../messages/webmonitor/ClusterOverview.java    | 125 ++++++++++++++++++
 .../webmonitor/RequestStatusOverview.java       |   2 +-
 .../messages/webmonitor/StatusOverview.java     | 125 ------------------
 .../resourcemanager/ResourceManager.java        |  12 ++
 .../resourcemanager/ResourceManagerGateway.java |   9 ++
 .../resourcemanager/ResourceOverview.java       |  53 ++++++++
 .../slotmanager/SlotManager.java                |  19 +--
 .../handler/legacy/ClusterOverviewHandler.java  |  26 ++--
 .../messages/ClusterOverviewWithVersion.java    | 130 +++++++++++++++++++
 .../rest/messages/ClusterOverviewHeaders.java   |   7 +-
 .../messages/StatusOverviewWithVersion.java     | 129 ------------------
 .../runtime/webmonitor/RestfulGateway.java      |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/dispatcher/DispatcherTest.java      |   5 +
 .../messages/WebMonitorMessagesTest.java        |   4 +-
 .../ClusterOverviewWithVersionTest.java         |  46 +++++++
 .../messages/StatusOverviewWithVersionTest.java |  44 -------
 24 files changed, 495 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 2a2c414..0a2d4d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -30,12 +30,12 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -244,11 +244,11 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        }
 
        @Override
-       public CompletableFuture<StatusOverview> requestStatusOverview(Time 
timeout) {
+       public CompletableFuture<ClusterOverview> requestClusterOverview(Time 
timeout) {
                return FutureUtils.toJava(
                        jobManagerGateway
                                .ask(RequestStatusOverview.getInstance(), 
FutureUtils.toFiniteDuration(timeout))
-                               
.mapTo(ClassTag$.MODULE$.apply(StatusOverview.class)));
+                               
.mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index bce2bed..efaebb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -40,10 +41,12 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -77,6 +80,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        private final RunningJobsRegistry runningJobsRegistry;
 
        private final HighAvailabilityServices highAvailabilityServices;
+       private final ResourceManagerGateway resourceManagerGateway;
        private final JobManagerServices jobManagerServices;
        private final HeartbeatServices heartbeatServices;
        private final MetricRegistry metricRegistry;
@@ -94,6 +98,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        String endpointId,
                        Configuration configuration,
                        HighAvailabilityServices highAvailabilityServices,
+                       ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
@@ -103,6 +108,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                this.configuration = Preconditions.checkNotNull(configuration);
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
+               this.resourceManagerGateway = 
Preconditions.checkNotNull(resourceManagerGateway);
                this.jobManagerServices = JobManagerServices.fromConfiguration(
                        configuration,
                        Preconditions.checkNotNull(blobServer));
@@ -271,17 +277,54 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
-       public CompletableFuture<StatusOverview> requestStatusOverview(Time 
timeout) {
-               // TODO: Implement proper cluster overview generation
-               return CompletableFuture.completedFuture(
-                       new StatusOverview(
-                               42,
-                               1337,
-                               1337,
-                               5,
-                               6,
-                               7,
-                               8));
+       public CompletableFuture<ClusterOverview> requestClusterOverview(Time 
timeout) {
+               CompletableFuture<ResourceOverview> taskManagerOverviewFuture = 
resourceManagerGateway.requestResourceOverview(timeout);
+
+               ArrayList<CompletableFuture<JobStatus>> jobStatus = new 
ArrayList<>(jobManagerRunners.size());
+
+               for (Map.Entry<JobID, JobManagerRunner> jobManagerRunnerEntry : 
jobManagerRunners.entrySet()) {
+                       CompletableFuture<JobStatus> jobStatusFuture = 
jobManagerRunnerEntry.getValue().getJobManagerGateway().requestJobStatus(timeout);
+
+                       jobStatus.add(jobStatusFuture);
+               }
+
+               CompletableFuture<Collection<JobStatus>> allJobsFuture = 
FutureUtils.combineAll(jobStatus);
+
+               return allJobsFuture.thenCombine(
+                       taskManagerOverviewFuture,
+                       (Collection<JobStatus> allJobsStatus, ResourceOverview 
resourceOverview) -> {
+
+                               int numberRunningOrPendingJobs = 0;
+                               int numberFinishedJobs = 0;
+                               int numberCancelledJobs = 0;
+                               int numberFailedJobs = 0;
+
+                               for (JobStatus status : allJobsStatus) {
+                                       switch (status) {
+                                               case FINISHED:
+                                                       numberFinishedJobs++;
+                                                       break;
+                                               case FAILED:
+                                                       numberFailedJobs++;
+                                                       break;
+                                               case CANCELED:
+                                                       numberCancelledJobs++;
+                                                       break;
+                                               default:
+                                                       
numberRunningOrPendingJobs++;
+                                                       break;
+                                       }
+                               }
+
+                               return new ClusterOverview(
+                                       
resourceOverview.getNumberTaskManagers(),
+                                       
resourceOverview.getNumberRegisteredSlots(),
+                                       resourceOverview.getNumberFreeSlots(),
+                                       numberRunningOrPendingJobs,
+                                       numberFinishedJobs,
+                                       numberCancelledJobs,
+                                       numberFailedJobs);
+                       });
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index d2c1428..c23bb98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
@@ -54,7 +55,6 @@ import 
org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
-import org.apache.flink.runtime.rest.messages.StatusOverviewWithVersion;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -113,7 +113,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
 
                final Time timeout = restConfiguration.getTimeout();
 
-               LegacyRestHandlerAdapter<DispatcherGateway, 
StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new 
LegacyRestHandlerAdapter<>(
+               LegacyRestHandlerAdapter<DispatcherGateway, 
ClusterOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = 
new LegacyRestHandlerAdapter<>(
                        restAddressFuture,
                        leaderRetriever,
                        timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index efd565a..5a6889e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -45,6 +46,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        String endpointId,
                        Configuration configuration,
                        HighAvailabilityServices highAvailabilityServices,
+                       ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
@@ -55,6 +57,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        endpointId,
                        configuration,
                        highAvailabilityServices,
+                       resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index be34b0c..e24e01a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -100,6 +101,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        configuration,
                        rpcService,
                        highAvailabilityServices,
+                       
resourceManager.getSelfGateway(ResourceManagerGateway.class),
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
@@ -168,6 +170,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                Configuration configuration,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
+               ResourceManagerGateway resourceManagerGateway,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
@@ -180,6 +183,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        Dispatcher.DISPATCHER_NAME,
                        configuration,
                        highAvailabilityServices,
+                       resourceManagerGateway,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7efcc0b..8f87b44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -750,6 +750,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                return 
CompletableFuture.completedFuture(executionGraph.archive());
        }
 
+       @Override
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               return 
CompletableFuture.completedFuture(executionGraph.getState());
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Internal methods
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0628976..d59feed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -249,4 +250,12 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway, FencedRp
         * @return Future archived execution graph derived from the currently 
executed job
         */
        CompletableFuture<AccessExecutionGraph> 
requestArchivedExecutionGraph(@RpcTimeout Time timeout);
+
+       /**
+        * Requests the current job status.
+        *
+        * @param timeout for the rpc call
+        * @return Future containing the current job status
+        */
+       CompletableFuture<JobStatus> requestJobStatus(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
new file mode 100644
index 0000000..8fe2f52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response to the {@link RequestStatusOverview} message, carrying a 
description
+ * of the Flink cluster status.
+ */
+public class ClusterOverview extends JobsOverview {
+
+       private static final long serialVersionUID = -729861859715105265L;
+
+       public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
+       public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
+       public static final String FIELD_NAME_SLOTS_AVAILABLE = 
"slots-available";
+
+       @JsonProperty(FIELD_NAME_TASKMANAGERS)
+       private final int numTaskManagersConnected;
+
+       @JsonProperty(FIELD_NAME_SLOTS_TOTAL)
+       private final int numSlotsTotal;
+
+       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
+       private final int numSlotsAvailable;
+
+       @JsonCreator
+       public ClusterOverview(
+                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
+                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int 
numJobsFailed) {
+
+               super(numJobsRunningOrPending, numJobsFinished, 
numJobsCancelled, numJobsFailed);
+               
+               this.numTaskManagersConnected = numTaskManagersConnected;
+               this.numSlotsTotal = numSlotsTotal;
+               this.numSlotsAvailable = numSlotsAvailable;
+       }
+
+       public ClusterOverview(int numTaskManagersConnected, int numSlotsTotal, 
int numSlotsAvailable,
+                                                  JobsOverview jobs1, 
JobsOverview jobs2) {
+               super(jobs1, jobs2);
+               this.numTaskManagersConnected = numTaskManagersConnected;
+               this.numSlotsTotal = numSlotsTotal;
+               this.numSlotsAvailable = numSlotsAvailable;
+       }
+
+       public int getNumTaskManagersConnected() {
+               return numTaskManagersConnected;
+       }
+
+       public int getNumSlotsTotal() {
+               return numSlotsTotal;
+       }
+
+       public int getNumSlotsAvailable() {
+               return numSlotsAvailable;
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+               else if (obj instanceof ClusterOverview) {
+                       ClusterOverview that = (ClusterOverview) obj;
+                       return this.numTaskManagersConnected == 
that.numTaskManagersConnected &&
+                                       this.numSlotsTotal == 
that.numSlotsTotal &&
+                                       this.numSlotsAvailable == 
that.numSlotsAvailable &&
+                                       this.getNumJobsRunningOrPending() == 
that.getNumJobsRunningOrPending() &&
+                                       this.getNumJobsFinished() == 
that.getNumJobsFinished() &&
+                                       this.getNumJobsCancelled() == 
that.getNumJobsCancelled() &&
+                                       this.getNumJobsFailed() == 
that.getNumJobsFailed();
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + numTaskManagersConnected;
+               result = 31 * result + numSlotsTotal;
+               result = 31 * result + numSlotsAvailable;
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "StatusOverview {" +
+                               "numTaskManagersConnected=" + 
numTaskManagersConnected +
+                               ", numSlotsTotal=" + numSlotsTotal +
+                               ", numSlotsAvailable=" + numSlotsAvailable +
+                               ", numJobsRunningOrPending=" + 
getNumJobsRunningOrPending() +
+                               ", numJobsFinished=" + getNumJobsFinished() +
+                               ", numJobsCancelled=" + getNumJobsCancelled() +
+                               ", numJobsFailed=" + getNumJobsFailed() +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
index c3797fd..e912c06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestStatusOverview.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages.webmonitor;
 /**
  * This message requests an overview of the status, such as how many 
TaskManagers
  * are currently connected, how many slots are available, how many are free, 
...
- * The response to this message is a {@link StatusOverview} message.
+ * The response to this message is a {@link ClusterOverview} message.
  */
 public class RequestStatusOverview implements InfoMessage {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
deleted file mode 100644
index 2c04b7e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverview.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response to the {@link RequestStatusOverview} message, carrying a 
description
- * of the Flink cluster status.
- */
-public class StatusOverview extends JobsOverview {
-
-       private static final long serialVersionUID = -729861859715105265L;
-
-       public static final String FIELD_NAME_TASKMANAGERS = "taskmanagers";
-       public static final String FIELD_NAME_SLOTS_TOTAL = "slots-total";
-       public static final String FIELD_NAME_SLOTS_AVAILABLE = 
"slots-available";
-
-       @JsonProperty(FIELD_NAME_TASKMANAGERS)
-       private final int numTaskManagersConnected;
-
-       @JsonProperty(FIELD_NAME_SLOTS_TOTAL)
-       private final int numSlotsTotal;
-
-       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE)
-       private final int numSlotsAvailable;
-
-       @JsonCreator
-       public StatusOverview(
-                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
-                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
-                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
-                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
-                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
-                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
-                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int 
numJobsFailed) {
-
-               super(numJobsRunningOrPending, numJobsFinished, 
numJobsCancelled, numJobsFailed);
-               
-               this.numTaskManagersConnected = numTaskManagersConnected;
-               this.numSlotsTotal = numSlotsTotal;
-               this.numSlotsAvailable = numSlotsAvailable;
-       }
-
-       public StatusOverview(int numTaskManagersConnected, int numSlotsTotal, 
int numSlotsAvailable,
-                                                       JobsOverview jobs1, 
JobsOverview jobs2) {
-               super(jobs1, jobs2);
-               this.numTaskManagersConnected = numTaskManagersConnected;
-               this.numSlotsTotal = numSlotsTotal;
-               this.numSlotsAvailable = numSlotsAvailable;
-       }
-
-       public int getNumTaskManagersConnected() {
-               return numTaskManagersConnected;
-       }
-
-       public int getNumSlotsTotal() {
-               return numSlotsTotal;
-       }
-
-       public int getNumSlotsAvailable() {
-               return numSlotsAvailable;
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (this == obj) {
-                       return true;
-               }
-               else if (obj instanceof  StatusOverview) {
-                       StatusOverview that = (StatusOverview) obj;
-                       return this.numTaskManagersConnected == 
that.numTaskManagersConnected &&
-                                       this.numSlotsTotal == 
that.numSlotsTotal &&
-                                       this.numSlotsAvailable == 
that.numSlotsAvailable &&
-                                       this.getNumJobsRunningOrPending() == 
that.getNumJobsRunningOrPending() &&
-                                       this.getNumJobsFinished() == 
that.getNumJobsFinished() &&
-                                       this.getNumJobsCancelled() == 
that.getNumJobsCancelled() &&
-                                       this.getNumJobsFailed() == 
that.getNumJobsFailed();
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               int result = super.hashCode();
-               result = 31 * result + numTaskManagersConnected;
-               result = 31 * result + numSlotsTotal;
-               result = 31 * result + numSlotsAvailable;
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return "StatusOverview {" +
-                               "numTaskManagersConnected=" + 
numTaskManagersConnected +
-                               ", numSlotsTotal=" + numSlotsTotal +
-                               ", numSlotsAvailable=" + numSlotsAvailable +
-                               ", numJobsRunningOrPending=" + 
getNumJobsRunningOrPending() +
-                               ", numJobsFinished=" + getNumJobsFinished() +
-                               ", numJobsCancelled=" + getNumJobsCancelled() +
-                               ", numJobsFailed=" + getNumJobsFailed() +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f69998c..d636ba4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -480,6 +480,18 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                return CompletableFuture.completedFuture(taskExecutors.size());
        }
 
+       @Override
+       public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
+               final int numberSlots = slotManager.getNumberRegisteredSlots();
+               final int numberFreeSlots = slotManager.getNumberFreeSlots();
+
+               return CompletableFuture.completedFuture(
+                       new ResourceOverview(
+                               taskExecutors.size(),
+                               numberSlots,
+                               numberFreeSlots));
+       }
+
        // 
------------------------------------------------------------------------
        //  Internal methods
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index a957716..e0674b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -157,4 +157,13 @@ public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManager
         * @param cause for the disconnection of the JobManager
         */
        void disconnectJobManager(JobID jobId, Exception cause);
+
+       /**
+        * Requests the resource overview. The resource overview provides 
information about the
+        * connected TaskManagers, the total number of slots and the number of 
available slots.
+        *
+        * @param timeout of the request
+        * @return Future containing the resource overview
+        */
+       CompletableFuture<ResourceOverview> requestResourceOverview(@RpcTimeout 
Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
new file mode 100644
index 0000000..1b3d5ca
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import java.io.Serializable;
+
+/**
+ * Class containing information about the available cluster resources.
+ */
+public class ResourceOverview implements Serializable {
+
+       private static final long serialVersionUID = 7618746920569224557L;
+
+       private final int numberTaskManagers;
+
+       private final int numberRegisteredSlots;
+
+       private final int numberFreeSlots;
+
+       public ResourceOverview(int numberTaskManagers, int 
numberRegisteredSlots, int numberFreeSlots) {
+               this.numberTaskManagers = numberTaskManagers;
+               this.numberRegisteredSlots = numberRegisteredSlots;
+               this.numberFreeSlots = numberFreeSlots;
+       }
+
+       public int getNumberTaskManagers() {
+               return numberTaskManagers;
+       }
+
+       public int getNumberRegisteredSlots() {
+               return numberRegisteredSlots;
+       }
+
+       public int getNumberFreeSlots() {
+               return numberFreeSlots;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d8eb47c..2b52ce5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
-import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +65,7 @@ import java.util.concurrent.TimeoutException;
  * slots are currently not used) and pending slot requests time out triggering 
their release and
  * failure, respectively.
  */
-public class SlotManager<T extends AbstractID> implements AutoCloseable {
+public class SlotManager implements AutoCloseable {
        private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
 
        /** Scheduled executor for timeouts */
@@ -136,6 +136,14 @@ public class SlotManager<T extends AbstractID> implements 
AutoCloseable {
                started = false;
        }
 
+       public int getNumberRegisteredSlots() {
+               return slots.size();
+       }
+
+       public int getNumberFreeSlots() {
+               return freeSlots.size();
+       }
+
        // 
---------------------------------------------------------------------------------------------
        // Component lifecycle methods
        // 
---------------------------------------------------------------------------------------------
@@ -925,11 +933,6 @@ public class SlotManager<T extends AbstractID> implements 
AutoCloseable {
        }
 
        @VisibleForTesting
-       int getNumberRegisteredSlots() {
-               return slots.size();
-       }
-
-       @VisibleForTesting
        PendingSlotRequest getSlotRequest(AllocationID allocationId) {
                return pendingSlotRequests.get(allocationId);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index ff05088..e0e6c2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.FlinkException;
 
@@ -48,7 +48,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler 
implements LegacyRestHandler<DispatcherGateway, StatusOverviewWithVersion, 
EmptyMessageParameters> {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler 
implements LegacyRestHandler<DispatcherGateway, ClusterOverviewWithVersion, 
EmptyMessageParameters> {
 
        private static final String version = 
EnvironmentInformation.getVersion();
 
@@ -71,25 +71,25 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler implement
                // we need no parameters, get all requests
                try {
                        if (jobManagerGateway != null) {
-                               CompletableFuture<StatusOverview> 
overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+                               CompletableFuture<ClusterOverview> 
overviewFuture = jobManagerGateway.requestClusterOverview(timeout);
 
                                return overviewFuture.thenApplyAsync(
-                                       (StatusOverview overview) -> {
+                                       (ClusterOverview overview) -> {
                                                StringWriter writer = new 
StringWriter();
                                                try {
                                                        JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
                                                        gen.writeStartObject();
-                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_TASKMANAGERS, 
overview.getNumTaskManagersConnected());
-                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_TOTAL, 
overview.getNumSlotsTotal());
-                                                       
gen.writeNumberField(StatusOverview.FIELD_NAME_SLOTS_AVAILABLE, 
overview.getNumSlotsAvailable());
+                                                       
gen.writeNumberField(ClusterOverview.FIELD_NAME_TASKMANAGERS, 
overview.getNumTaskManagersConnected());
+                                                       
gen.writeNumberField(ClusterOverview.FIELD_NAME_SLOTS_TOTAL, 
overview.getNumSlotsTotal());
+                                                       
gen.writeNumberField(ClusterOverview.FIELD_NAME_SLOTS_AVAILABLE, 
overview.getNumSlotsAvailable());
                                                        
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_RUNNING, 
overview.getNumJobsRunningOrPending());
                                                        
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FINISHED, 
overview.getNumJobsFinished());
                                                        
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_CANCELLED, 
overview.getNumJobsCancelled());
                                                        
gen.writeNumberField(JobsOverview.FIELD_NAME_JOBS_FAILED, 
overview.getNumJobsFailed());
-                                                       
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_VERSION, version);
+                                                       
gen.writeStringField(ClusterOverviewWithVersion.FIELD_NAME_VERSION, version);
                                                        if 
(!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-                                                               
gen.writeStringField(StatusOverviewWithVersion.FIELD_NAME_COMMIT, commitID);
+                                                               
gen.writeStringField(ClusterOverviewWithVersion.FIELD_NAME_COMMIT, commitID);
                                                        }
                                                        gen.writeEndObject();
 
@@ -110,10 +110,10 @@ public class ClusterOverviewHandler extends 
AbstractJsonRequestHandler implement
        }
 
        @Override
-       public CompletableFuture<StatusOverviewWithVersion> 
handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, 
DispatcherGateway gateway) {
-               CompletableFuture<StatusOverview> overviewFuture = 
gateway.requestStatusOverview(timeout);
+       public CompletableFuture<ClusterOverviewWithVersion> 
handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, 
DispatcherGateway gateway) {
+               CompletableFuture<ClusterOverview> overviewFuture = 
gateway.requestClusterOverview(timeout);
 
                return overviewFuture.thenApply(
-                       statusOverview -> 
StatusOverviewWithVersion.fromStatusOverview(statusOverview, version, 
commitID));
+                       statusOverview -> 
ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, 
commitID));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
new file mode 100644
index 0000000..dc0902c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersion.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Cluster overview message including the current Flink version and commit id.
+ */
+public class ClusterOverviewWithVersion extends ClusterOverview implements 
ResponseBody {
+
+       private static final long serialVersionUID = 5000058311783413216L;
+
+       public static final String FIELD_NAME_VERSION = "flink-version";
+       public static final String FIELD_NAME_COMMIT = "flink-commit";
+
+       @JsonProperty(FIELD_NAME_VERSION)
+       private final String version;
+
+       @JsonProperty(FIELD_NAME_COMMIT)
+       private final String commitId;
+
+       @JsonCreator
+       public ClusterOverviewWithVersion(
+                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
+                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
+                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
+                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
+                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
+                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
+                       @JsonProperty(FIELD_NAME_VERSION) String version,
+                       @JsonProperty(FIELD_NAME_COMMIT) String commitId) {
+               super(
+                       numTaskManagersConnected,
+                       numSlotsTotal,
+                       numSlotsAvailable,
+                       numJobsRunningOrPending,
+                       numJobsFinished,
+                       numJobsCancelled,
+                       numJobsFailed);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public ClusterOverviewWithVersion(
+                       int numTaskManagersConnected,
+                       int numSlotsTotal,
+                       int numSlotsAvailable,
+                       JobsOverview jobs1,
+                       JobsOverview jobs2,
+                       String version,
+                       String commitId) {
+               super(numTaskManagersConnected, numSlotsTotal, 
numSlotsAvailable, jobs1, jobs2);
+
+               this.version = Preconditions.checkNotNull(version);
+               this.commitId = Preconditions.checkNotNull(commitId);
+       }
+
+       public static ClusterOverviewWithVersion 
fromStatusOverview(ClusterOverview statusOverview, String version, String 
commitId) {
+               return new ClusterOverviewWithVersion(
+                       statusOverview.getNumTaskManagersConnected(),
+                       statusOverview.getNumSlotsTotal(),
+                       statusOverview.getNumSlotsAvailable(),
+                       statusOverview.getNumJobsRunningOrPending(),
+                       statusOverview.getNumJobsFinished(),
+                       statusOverview.getNumJobsCancelled(),
+                       statusOverview.getNumJobsFailed(),
+                       version,
+                       commitId);
+       }
+
+       public String getVersion() {
+               return version;
+       }
+
+       public String getCommitId() {
+               return commitId;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+
+               ClusterOverviewWithVersion that = (ClusterOverviewWithVersion) 
o;
+
+               return Objects.equals(version, that.getVersion()) && 
Objects.equals(commitId, that.getCommitId());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + (version != null ? version.hashCode() : 
0);
+               result = 31 * result + (commitId != null ? commitId.hashCode() 
: 0);
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index f8411e4..688be79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
  * Message headers for the {@link ClusterOverviewHandler}.
  */
-public final class ClusterOverviewHeaders implements 
MessageHeaders<EmptyRequestBody, StatusOverviewWithVersion, 
EmptyMessageParameters> {
+public final class ClusterOverviewHeaders implements 
MessageHeaders<EmptyRequestBody, ClusterOverviewWithVersion, 
EmptyMessageParameters> {
 
        private static final ClusterOverviewHeaders INSTANCE = new 
ClusterOverviewHeaders();
 
@@ -51,8 +52,8 @@ public final class ClusterOverviewHeaders implements 
MessageHeaders<EmptyRequest
        }
 
        @Override
-       public Class<StatusOverviewWithVersion> getResponseClass() {
-               return StatusOverviewWithVersion.class;
+       public Class<ClusterOverviewWithVersion> getResponseClass() {
+               return ClusterOverviewWithVersion.class;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java
deleted file mode 100644
index 1ee5c86..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersion.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Status overview message including the current Flink version and commit id.
- */
-public class StatusOverviewWithVersion extends StatusOverview implements 
ResponseBody {
-
-       private static final long serialVersionUID = 5000058311783413216L;
-
-       public static final String FIELD_NAME_VERSION = "flink-version";
-       public static final String FIELD_NAME_COMMIT = "flink-commit";
-
-       @JsonProperty(FIELD_NAME_VERSION)
-       private final String version;
-
-       @JsonProperty(FIELD_NAME_COMMIT)
-       private final String commitId;
-
-       @JsonCreator
-       public StatusOverviewWithVersion(
-                       @JsonProperty(FIELD_NAME_TASKMANAGERS) int 
numTaskManagersConnected,
-                       @JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
-                       @JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int 
numSlotsAvailable,
-                       @JsonProperty(FIELD_NAME_JOBS_RUNNING) int 
numJobsRunningOrPending,
-                       @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
-                       @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
-                       @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
-                       @JsonProperty(FIELD_NAME_VERSION) String version,
-                       @JsonProperty(FIELD_NAME_COMMIT) String commitId) {
-               super(
-                       numTaskManagersConnected,
-                       numSlotsTotal,
-                       numSlotsAvailable,
-                       numJobsRunningOrPending,
-                       numJobsFinished,
-                       numJobsCancelled,
-                       numJobsFailed);
-
-               this.version = Preconditions.checkNotNull(version);
-               this.commitId = Preconditions.checkNotNull(commitId);
-       }
-
-       public StatusOverviewWithVersion(
-                       int numTaskManagersConnected,
-                       int numSlotsTotal,
-                       int numSlotsAvailable,
-                       JobsOverview jobs1,
-                       JobsOverview jobs2,
-                       String version,
-                       String commitId) {
-               super(numTaskManagersConnected, numSlotsTotal, 
numSlotsAvailable, jobs1, jobs2);
-
-               this.version = Preconditions.checkNotNull(version);
-               this.commitId = Preconditions.checkNotNull(commitId);
-       }
-
-       public static StatusOverviewWithVersion 
fromStatusOverview(StatusOverview statusOverview, String version, String 
commitId) {
-               return new StatusOverviewWithVersion(
-                       statusOverview.getNumTaskManagersConnected(),
-                       statusOverview.getNumSlotsTotal(),
-                       statusOverview.getNumSlotsAvailable(),
-                       statusOverview.getNumJobsRunningOrPending(),
-                       statusOverview.getNumJobsFinished(),
-                       statusOverview.getNumJobsCancelled(),
-                       statusOverview.getNumJobsFailed(),
-                       version,
-                       commitId);
-       }
-
-       public String getVersion() {
-               return version;
-       }
-
-       public String getCommitId() {
-               return commitId;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               if (!super.equals(o)) {
-                       return false;
-               }
-
-               StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
-
-               return Objects.equals(version, that.getVersion()) && 
Objects.equals(commitId, that.getCommitId());
-       }
-
-       @Override
-       public int hashCode() {
-               int result = super.hashCode();
-               result = 31 * result + (version != null ? version.hashCode() : 
0);
-               result = 31 * result + (commitId != null ? commitId.hashCode() 
: 0);
-               return result;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index d5d194d..b2fc026 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
@@ -75,5 +75,5 @@ public interface RestfulGateway extends RpcGateway {
         * @param timeout for the asynchronous operation
         * @return Future containing the status overview
         */
-       CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout 
Time timeout);
+       CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout 
Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 85a6aed..b7d5dda 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1676,7 +1676,7 @@ class JobManager(
           val future = (archive ? RequestJobsOverview.getInstance())(timeout)
           future.onSuccess {
             case archiveOverview: JobsOverview =>
-              theSender ! new StatusOverview(numTMs, numSlotsTotal, 
numSlotsAvailable,
+              theSender ! new ClusterOverview(numTMs, numSlotsTotal, 
numSlotsAvailable,
                 ourJobs, archiveOverview)
           }(context.dispatcher)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1ec1cc3..682b907 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -105,6 +106,7 @@ public class DispatcherTest extends TestLogger {
                        Dispatcher.DISPATCHER_NAME,
                        new Configuration(),
                        haServices,
+                       mock(ResourceManagerGateway.class),
                        mock(BlobServer.class),
                        heartbeatServices,
                        mock(MetricRegistry.class),
@@ -164,6 +166,7 @@ public class DispatcherTest extends TestLogger {
                        Dispatcher.DISPATCHER_NAME,
                        new Configuration(),
                        haServices,
+                       mock(ResourceManagerGateway.class),
                        mock(BlobServer.class),
                        heartbeatServices,
                        mock(MetricRegistry.class),
@@ -198,6 +201,7 @@ public class DispatcherTest extends TestLogger {
                                String endpointId,
                                Configuration configuration,
                                HighAvailabilityServices 
highAvailabilityServices,
+                               ResourceManagerGateway resourceManagerGateway,
                                BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
                                MetricRegistry metricRegistry,
@@ -209,6 +213,7 @@ public class DispatcherTest extends TestLogger {
                                endpointId,
                                configuration,
                                highAvailabilityServices,
+                               resourceManagerGateway,
                                blobServer,
                                heartbeatServices,
                                metricRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index f5f4976..b11ff6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -53,7 +53,7 @@ public class WebMonitorMessagesTest {
                        
GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
 
                        
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobDetails.class,
 rnd));
-                       
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(StatusOverview.class,
 rnd));
+                       
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class,
 rnd));
                        
GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class,
 rnd));
                        
                        GenericMessageTester.testMessageInstance(new 
JobsWithIDsOverview(

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
new file mode 100644
index 0000000..c7b814a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for the {@link ClusterOverviewWithVersion}.
+ */
+public class ClusterOverviewWithVersionTest extends 
RestResponseMarshallingTestBase<ClusterOverviewWithVersion> {
+
+       @Override
+       protected Class<ClusterOverviewWithVersion> getTestResponseClass() {
+               return ClusterOverviewWithVersion.class;
+       }
+
+       @Override
+       protected ClusterOverviewWithVersion getTestResponseInstance() {
+               return new ClusterOverviewWithVersion(
+                       1,
+                       3,
+                       3,
+                       7,
+                       4,
+                       2,
+                       0,
+                       "version",
+                       "commit");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/660a45ca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
deleted file mode 100644
index b2376c5..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-/**
- * Tests for the {@link StatusOverviewWithVersion}.
- */
-public class StatusOverviewWithVersionTest extends 
RestResponseMarshallingTestBase<StatusOverviewWithVersion> {
-
-       @Override
-       protected Class<StatusOverviewWithVersion> getTestResponseClass() {
-               return StatusOverviewWithVersion.class;
-       }
-
-       @Override
-       protected StatusOverviewWithVersion getTestResponseInstance() {
-               return new StatusOverviewWithVersion(
-                       1,
-                       3,
-                       3,
-                       7,
-                       4,
-                       2,
-                       0,
-                       "version",
-                       "commit");
-       }
-}

Reply via email to