asfgit closed pull request #6859: Revert FLINK-10354 and FLINK-10247 for 
release-1.6
URL: https://github.com/apache/flink/pull/6859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 5f52abd38ed..0fe9d0c36f8 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,11 +7,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>metrics.internal.query-service.port</h5></td>
-            <td style="word-wrap: break-word;">"0"</td>
-            <td>The port range used for Flink's internal metric query service. 
Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination 
of both. It is recommended to set a range of ports to avoid collisions when 
multiple Flink components are running on the same machine. Per default Flink 
will pick a random port.</td>
-        </tr>
         <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"subtask"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index e76b7f2dbbe..fc6b3c14c46 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -130,18 +130,6 @@
                        .defaultValue(128)
                        .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
-       /**
-        * The default network port range for Flink's internal metric query 
service. The {@code "0"} means that
-        * Flink searches for a free port.
-        */
-       public static final ConfigOption<String> QUERY_SERVICE_PORT =
-               key("metrics.internal.query-service.port")
-               .defaultValue("0")
-               .withDescription("The port range used for Flink's internal 
metric query service. Accepts a list of ports " +
-                       "(“50100,50101”), ranges(“50100-50200”) or a 
combination of both. It is recommended to set a range of " +
-                       "ports to avoid collisions when multiple Flink 
components are running on the same machine. Per default " +
-                       "Flink will pick a random port.");
-
        private MetricOptions() {
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java 
b/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
deleted file mode 100644
index 89dcea45093..00000000000
--- a/flink-core/src/main/java/org/apache/flink/types/SerializableOptional.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.types;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Serializable {@link Optional}.
- */
-public final class SerializableOptional<T extends Serializable> implements 
Serializable {
-       private static final long serialVersionUID = -3312769593551775940L;
-
-       private static final SerializableOptional<?> EMPTY = new 
SerializableOptional<>(null);
-
-       @Nullable
-       private final T value;
-
-       private SerializableOptional(@Nullable T value) {
-               this.value = value;
-       }
-
-       public T get() {
-               if (value == null) {
-                       throw new NoSuchElementException("No value present");
-               }
-               return value;
-       }
-
-       public boolean isPresent() {
-               return value != null;
-       }
-
-       public void ifPresent(Consumer<? super T> consumer) {
-               if (value != null) {
-                       consumer.accept(value);
-               }
-       }
-
-       public <R> Optional<R> map(Function<? super T, ? extends R> mapper) {
-               if (value == null) {
-                       return Optional.empty();
-               } else {
-                       return Optional.ofNullable(mapper.apply(value));
-               }
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
of(@Nonnull T value) {
-               return new SerializableOptional<>(value);
-       }
-
-       public static <T extends Serializable> SerializableOptional<T> 
ofNullable(@Nullable T value) {
-               if (value == null) {
-                       return empty();
-               } else {
-                       return of(value);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public static <T extends Serializable> SerializableOptional<T> empty() {
-               return (SerializableOptional<T>) EMPTY;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..e936b246222 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -839,22 +839,28 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) thro
                        // the pending checkpoint must be discarded after the 
finalization
                        
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
 
-                       try {
-                               
completedCheckpointStore.addCheckpoint(completedCheckpoint);
-                       } catch (Exception exception) {
-                               // we failed to store the completed checkpoint. 
Let's clean up
-                               executor.execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       
completedCheckpoint.discardOnFailedStoring();
-                                               } catch (Throwable t) {
-                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), t);
+                       // TODO: add savepoints to completed checkpoint store 
once FLINK-4815 has been completed
+                       if (!completedCheckpoint.getProperties().isSavepoint()) 
{
+                               try {
+                                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+                               } catch (Exception exception) {
+                                       // we failed to store the completed 
checkpoint. Let's clean up
+                                       executor.execute(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       try {
+                                                               
completedCheckpoint.discardOnFailedStoring();
+                                                       } catch (Throwable t) {
+                                                               LOG.warn("Could 
not properly discard completed checkpoint {} of job {}.", 
completedCheckpoint.getCheckpointID(), job, t);
+                                                       }
                                                }
-                                       }
-                               });
+                                       });
 
-                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.', exception);
+                                       throw new CheckpointException("Could 
not complete the pending checkpoint " + checkpointId + '.', exception);
+                               }
+
+                               // drop those pending checkpoints that are at 
prior to the completed one
+                               dropSubsumedCheckpoints(checkpointId);
                        }
                } finally {
                        pendingCheckpoints.remove(checkpointId);
@@ -864,9 +870,6 @@ public void run() {
 
                rememberRecentCheckpointId(checkpointId);
 
-               // drop those pending checkpoints that are at prior to the 
completed one
-               dropSubsumedCheckpoints(checkpointId);
-
                // record the time when this was completed, to calculate
                // the 'min delay between checkpoints'
                lastCheckpointCompletionNanos = System.nanoTime();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 00b61737d20..56e45762263 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -46,7 +46,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -86,66 +85,13 @@
         * @param portRangeDefinition The port range to choose a port from.
         * @param logger The logger to output log information.
         * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               String portRangeDefinition,
-               Logger logger) throws Exception {
-               return startActorSystem(
-                       configuration,
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
-        */
-       public static ActorSystem startActorSystem(
-                       Configuration configuration,
-                       String listeningAddress,
-                       String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       portRangeDefinition,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an ActorSystem with the given configuration listening at the 
address/ports.
-        *
-        * @param configuration The Flink configuration
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param portRangeDefinition The port range to choose a port from.
-        * @param logger The logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started
-        * @throws Exception Thrown when actor system cannot be started in 
specified port range
+        * @throws Exception
         */
        public static ActorSystem startActorSystem(
                        Configuration configuration,
-                       String actorSystemName,
                        String listeningAddress,
                        String portRangeDefinition,
-                       Logger logger,
-                       @Nonnull ActorSystemExecutorMode executorMode) throws 
Exception {
+                       Logger logger) throws Exception {
 
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
@@ -171,13 +117,7 @@ public static ActorSystem startActorSystem(
                        }
 
                        try {
-                               return startActorSystem(
-                                       configuration,
-                                       actorSystemName,
-                                       listeningAddress,
-                                       port,
-                                       logger,
-                                       executorMode);
+                               return startActorSystem(configuration, 
listeningAddress, port, logger);
                        }
                        catch (Exception e) {
                                // we can continue to try if this contains a 
netty channel exception
@@ -196,7 +136,6 @@ public static ActorSystem startActorSystem(
 
        /**
         * Starts an Actor System at a specific port.
-        *
         * @param configuration The Flink configuration.
         * @param listeningAddress The address to listen at.
         * @param listeningPort The port to listen at.
@@ -204,57 +143,11 @@ public static ActorSystem startActorSystem(
         * @return The ActorSystem which has been started.
         * @throws Exception
         */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger) throws Exception {
-               return startActorSystem(configuration, listeningAddress, 
listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
        public static ActorSystem startActorSystem(
                                Configuration configuration,
                                String listeningAddress,
                                int listeningPort,
-                               Logger logger,
-                               ActorSystemExecutorMode executorMode) throws 
Exception {
-               return startActorSystem(
-                       configuration,
-                       AkkaUtils.getFlinkActorSystemName(),
-                       listeningAddress,
-                       listeningPort,
-                       logger,
-                       executorMode);
-       }
-
-       /**
-        * Starts an Actor System at a specific port.
-        * @param configuration The Flink configuration.
-        * @param actorSystemName Name of the started {@link ActorSystem}
-        * @param listeningAddress The address to listen at.
-        * @param listeningPort The port to listen at.
-        * @param logger the logger to output log information.
-        * @param executorMode The executor mode of Akka actor system.
-        * @return The ActorSystem which has been started.
-        * @throws Exception
-        */
-       public static ActorSystem startActorSystem(
-               Configuration configuration,
-               String actorSystemName,
-               String listeningAddress,
-               int listeningPort,
-               Logger logger,
-               ActorSystemExecutorMode executorMode) throws Exception {
+                               Logger logger) throws Exception {
 
                String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
                logger.info("Trying to start actor system at {}", hostPortUrl);
@@ -262,13 +155,12 @@ public static ActorSystem startActorSystem(
                try {
                        Config akkaConfig = AkkaUtils.getAkkaConfig(
                                configuration,
-                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort)),
-                               getExecutorConfigByExecutorMode(configuration, 
executorMode)
+                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort))
                        );
 
                        logger.debug("Using akka configuration\n {}", 
akkaConfig);
 
-                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
+                       ActorSystem actorSystem = 
AkkaUtils.createActorSystem(akkaConfig);
 
                        logger.info("Actor system started at {}", 
AkkaUtils.getAddress(actorSystem));
                        return actorSystem;
@@ -278,24 +170,13 @@ public static ActorSystem startActorSystem(
                                Throwable cause = t.getCause();
                                if (cause != null && t.getCause() instanceof 
BindException) {
                                        throw new IOException("Unable to create 
ActorSystem at address " + hostPortUrl +
-                                               " : " + cause.getMessage(), t);
+                                                       " : " + 
cause.getMessage(), t);
                                }
                        }
                        throw new Exception("Could not create actor system", t);
                }
        }
 
-       private static Config getExecutorConfigByExecutorMode(Configuration 
configuration, ActorSystemExecutorMode executorMode) {
-               switch (executorMode) {
-                       case FORK_JOIN_EXECUTOR:
-                               return 
AkkaUtils.getForkJoinExecutorConfig(configuration);
-                       case FIXED_THREAD_POOL_EXECUTOR:
-                               return AkkaUtils.getThreadPoolExecutorConfig();
-                       default:
-                               throw new 
IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", 
executorMode));
-               }
-       }
-
        /**
         * Starts the web frontend.
         *
@@ -623,14 +504,4 @@ public static Configuration 
cloneConfiguration(Configuration configuration) {
 
                return clonedConfiguration;
        }
-
-       /**
-        * Options to specify which executor to use in an {@link ActorSystem}.
-        */
-       public enum ActorSystemExecutorMode {
-               /** Used by default, use dispatcher with fork-join-executor. **/
-               FORK_JOIN_EXECUTOR,
-               /** Use dispatcher with fixed thread pool executor. **/
-               FIXED_THREAD_POOL_EXECUTOR
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 2a9baa925ff..fd0a0a1d39f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -96,8 +96,6 @@
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -156,9 +154,6 @@
        @GuardedBy("lock")
        private WebMonitorEndpoint<?> webMonitorEndpoint;
 
-       @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
        @GuardedBy("lock")
        private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
@@ -281,9 +276,9 @@ protected void initializeServices(Configuration 
configuration) throws Exception
                        metricRegistry = createMetricRegistry(configuration);
 
                        // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
-                       // Start actor system for metric query service on any 
available port
-                       metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
-                       
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                       // start the MetricQueryService
+                       final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+                       metricRegistry.startQueryService(actorSystem, null);
 
                        archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
commonRpcService.getScheduledExecutor());
 
@@ -401,7 +396,7 @@ protected RpcService createRpcService(
                        Configuration configuration,
                        String bindAddress,
                        String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
+               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
                FiniteDuration duration = AkkaUtils.getTimeout(configuration);
                return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
        }
@@ -469,10 +464,6 @@ protected MetricRegistryImpl 
createMetricRegistry(Configuration configuration) {
                                
terminationFutures.add(metricRegistry.shutdown());
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        if (commonRpcService != null) {
                                
terminationFutures.add(commonRpcService.stopService());
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 39e5f44583c..3fd268a1aeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -18,11 +18,8 @@
 
 package org.apache.flink.runtime.metrics.util;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -30,7 +27,6 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,15 +45,12 @@
 import java.lang.management.ThreadMXBean;
 import java.util.List;
 
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
-
 /**
  * Utility class to register pre-defined metric sets.
  */
 public class MetricUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
        private static final String METRIC_GROUP_STATUS_NAME = "Status";
-       private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics";
 
        private MetricUtils() {
        }
@@ -109,17 +102,6 @@ public static void instantiateStatusMetrics(
                instantiateCPUMetrics(jvm.addGroup("CPU"));
        }
 
-       public static ActorSystem startMetricsActorSystem(Configuration 
configuration, String hostname, Logger logger) throws Exception {
-               final String portRange = 
configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
-               return BootstrapTools.startActorSystem(
-                       configuration,
-                       METRICS_ACTOR_SYSTEM_NAME,
-                       hostname,
-                       portRange,
-                       logger,
-                       FIXED_THREAD_POOL_EXECUTOR);
-       }
-
        private static void instantiateNetworkMetrics(
                MetricGroup metrics,
                final NetworkEnvironment network) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bcd4c7b93fb..4bfdb25c4d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -133,9 +133,6 @@
        @GuardedBy("lock")
        private RpcService resourceManagerRpcService;
 
-       @GuardedBy("lock")
-       private ActorSystem metricQueryServiceActorSystem;
-
        @GuardedBy("lock")
        private HighAvailabilityServices haServices;
 
@@ -255,11 +252,8 @@ public void start() throws Exception {
                                commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
                                // TODO: Temporary hack until the metric query 
service is ported to the RpcEndpoint
-                               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(
-                                       configuration,
-                                       commonRpcService.getAddress(),
-                                       LOG);
-                               
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
+                               final ActorSystem actorSystem = 
((AkkaRpcService) commonRpcService).getActorSystem();
+                               metricRegistry.startQueryService(actorSystem, 
null);
 
                                if (useSingleRpcService) {
                                        for (int i = 0; i < numTaskManagers; 
i++) {
@@ -356,7 +350,7 @@ public void start() throws Exception {
                                                
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                                                "DispatcherRestEndpoint"),
                                        new AkkaQueryServiceRetriever(
-                                               metricQueryServiceActorSystem,
+                                               actorSystem,
                                                
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                                        
haServices.getWebMonitorLeaderElectionService(),
                                        new ShutDownFatalErrorHandler());
@@ -453,12 +447,24 @@ public void start() throws Exception {
 
                                        final FutureUtils.ConjunctFuture<Void> 
componentsTerminationFuture = 
FutureUtils.completeAll(componentTerminationFutures);
 
-                                       final CompletableFuture<Void> 
metricSystemTerminationFuture = FutureUtils.composeAfterwards(
+                                       final CompletableFuture<Void> 
metricRegistryTerminationFuture = FutureUtils.runAfterwards(
                                                componentsTerminationFuture,
-                                               this::closeMetricSystem);
+                                               () -> {
+                                                       synchronized (lock) {
+                                                               if 
(jobManagerMetricGroup != null) {
+                                                                       
jobManagerMetricGroup.close();
+                                                                       
jobManagerMetricGroup = null;
+                                                               }
+                                                               // metrics 
shutdown
+                                                               if 
(metricRegistry != null) {
+                                                                       
metricRegistry.shutdown();
+                                                                       
metricRegistry = null;
+                                                               }
+                                                       }
+                                               });
 
                                        // shut down the RpcServices
-                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricSystemTerminationFuture
+                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricRegistryTerminationFuture
                                                .thenCompose((Void ignored) -> 
terminateRpcServices());
 
                                        final CompletableFuture<Void> 
remainingServicesTerminationFuture = FutureUtils.runAfterwards(
@@ -482,29 +488,6 @@ public void start() throws Exception {
                }
        }
 
-       private CompletableFuture<Void> closeMetricSystem() {
-               synchronized (lock) {
-                       if (jobManagerMetricGroup != null) {
-                               jobManagerMetricGroup.close();
-                               jobManagerMetricGroup = null;
-                       }
-
-                       final ArrayList<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(2);
-
-                       // metrics shutdown
-                       if (metricRegistry != null) {
-                               
terminationFutures.add(metricRegistry.shutdown());
-                               metricRegistry = null;
-                       }
-
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
-                       return FutureUtils.completeAll(terminationFutures);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Accessing jobs
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a3a075d0f98..d78e3465ec4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -48,6 +48,7 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -75,13 +76,11 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -594,26 +593,19 @@ public void unRegisterInfoMessageListener(final String 
address) {
 
        @Override
        public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
-               final ArrayList<CompletableFuture<Optional<Tuple2<ResourceID, 
String>>>> metricQueryServicePathFutures = new 
ArrayList<>(taskExecutors.size());
+               final ArrayList<Tuple2<ResourceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
                for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> 
workerRegistrationEntry : taskExecutors.entrySet()) {
                        final ResourceID tmResourceId = 
workerRegistrationEntry.getKey();
                        final WorkerRegistration<WorkerType> workerRegistration 
= workerRegistrationEntry.getValue();
-                       final TaskExecutorGateway taskExecutorGateway = 
workerRegistration.getTaskExecutorGateway();
+                       final String taskManagerAddress = 
workerRegistration.getTaskExecutorGateway().getAddress();
+                       final String tmMetricQueryServicePath = 
taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
+                               MetricQueryService.METRIC_QUERY_SERVICE_NAME + 
'_' + tmResourceId.getResourceIdString();
 
-                       final CompletableFuture<Optional<Tuple2<ResourceID, 
String>>> metricQueryServicePathFuture = taskExecutorGateway
-                               .requestMetricQueryServiceAddress(timeout)
-                               .thenApply(optional -> optional.map(path -> 
Tuple2.of(tmResourceId, path)));
-
-                       
metricQueryServicePathFutures.add(metricQueryServicePathFuture);
+                       metricQueryServicePaths.add(Tuple2.of(tmResourceId, 
tmMetricQueryServicePath));
                }
 
-               return 
FutureUtils.combineAll(metricQueryServicePathFutures).thenApply(
-                       collection -> collection
-                               .stream()
-                               .filter(Optional::isPresent)
-                               .map(Optional::get)
-                               .collect(Collectors.toList()));
+               return 
CompletableFuture.completedFuture(metricQueryServicePaths);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..3a626986361 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -70,10 +70,7 @@
         * @throws IOException      Thrown, if the actor system can not bind to 
the address
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
-       public static RpcService createRpcService(
-               String hostname,
-               int port,
-               Configuration configuration) throws Exception {
+       public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
                LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
                final ActorSystem actorSystem;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f90e939bc01..33db7e13076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -102,7 +102,6 @@
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -158,10 +157,6 @@
 
        private final BlobCacheService blobCacheService;
 
-       /** The path to metric query service on this Task Manager. */
-       @Nullable
-       private final String metricQueryServicePath;
-
        // --------- TaskManager services --------
 
        /** The connection information of this task manager. */
@@ -216,7 +211,6 @@ public TaskExecutor(
                        TaskManagerServices taskExecutorServices,
                        HeartbeatServices heartbeatServices,
                        TaskManagerMetricGroup taskManagerMetricGroup,
-                       @Nullable String metricQueryServicePath,
                        BlobCacheService blobCacheService,
                        FatalErrorHandler fatalErrorHandler) {
 
@@ -230,7 +224,6 @@ public TaskExecutor(
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.blobCacheService = checkNotNull(blobCacheService);
-               this.metricQueryServicePath = metricQueryServicePath;
 
                this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
                this.jobManagerTable = 
taskExecutorServices.getJobManagerTable();
@@ -854,11 +847,6 @@ public void heartbeatFromResourceManager(ResourceID 
resourceID) {
                }
        }
 
-       @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.ofNullable(metricQueryServicePath));
-       }
-
        // 
----------------------------------------------------------------------
        // Disconnection RPCs
        // 
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d6b9e152e8f..4f792896216 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -36,7 +36,6 @@
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.types.SerializableOptional;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -196,11 +195,4 @@
         * @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
         */
        CompletableFuture<TransientBlobKey> requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
-
-       /**
-        * Returns the fully qualified address of Metric Query Service on the 
TaskManager.
-        *
-        * @return Future String with Fully qualified (RPC) address of Metric 
Query Service on the TaskManager.
-        */
-       CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(@RpcTimeout Time timeout);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f830ae1968a..42fe5bf658b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -100,8 +101,6 @@
 
        private final RpcService rpcService;
 
-       private final ActorSystem metricQueryServiceActorSystem;
-
        private final HighAvailabilityServices highAvailabilityServices;
 
        private final MetricRegistryImpl metricRegistry;
@@ -133,14 +132,14 @@ public TaskManagerRunner(Configuration configuration, 
ResourceID resourceId) thr
                        
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
                rpcService = createRpcService(configuration, 
highAvailabilityServices);
-               metricQueryServiceActorSystem = 
MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), 
LOG);
 
                HeartbeatServices heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
                metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
                // TODO: Temporary hack until the MetricQueryService has been 
ported to RpcEndpoint
-               metricRegistry.startQueryService(metricQueryServiceActorSystem, 
resourceId);
+               final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, resourceId);
 
                blobCacheService = new BlobCacheService(
                        configuration, 
highAvailabilityServices.createBlobStore(), null
@@ -160,7 +159,7 @@ public TaskManagerRunner(Configuration configuration, 
ResourceID resourceId) thr
                this.terminationFuture = new CompletableFuture<>();
                this.shutdown = false;
 
-               MemoryLogger.startIfConfigured(LOG, configuration, 
metricQueryServiceActorSystem);
+               MemoryLogger.startIfConfigured(LOG, configuration, actorSystem);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -215,10 +214,6 @@ public void start() throws Exception {
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       if (metricQueryServiceActorSystem != null) {
-                               
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
-                       }
-
                        try {
                                highAvailabilityServices.close();
                        } catch (Exception e) {
@@ -378,8 +373,6 @@ public static TaskExecutor startTaskManager(
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
-               String metricQueryServicePath = 
metricRegistry.getMetricQueryServicePath();
-
                return new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
@@ -387,7 +380,6 @@ public static TaskExecutor startTaskManager(
                        taskManagerServices,
                        heartbeatServices,
                        taskManagerMetricGroup,
-                       metricQueryServicePath,
                        blobCacheService,
                        fatalErrorHandler);
        }
@@ -424,14 +416,6 @@ public static RpcService createRpcService(
 
                final String portRangeDefinition = 
configuration.getString(TaskManagerOptions.RPC_PORT);
 
-               return bindWithPort(configuration, taskManagerHostname, 
portRangeDefinition);
-       }
-
-       private static RpcService bindWithPort(
-               Configuration configuration,
-               String taskManagerHostname,
-               String portRangeDefinition) throws Exception{
-
                // parse port range definition and create port iterator
                Iterator<Integer> portsIterator;
                try {
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2b0c939e574..dcf0fdd9bdf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -50,12 +50,6 @@ object AkkaUtils {
 
   val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
-  val FLINK_ACTOR_SYSTEM_NAME = "flink"
-
-  def getFlinkActorSystemName = {
-    FLINK_ACTOR_SYSTEM_NAME
-  }
-
   /**
    * Creates a local actor system without remoting.
    *
@@ -109,19 +103,9 @@ object AkkaUtils {
    * @return created actor system
    */
   def createActorSystem(akkaConfig: Config): ActorSystem = {
-    createActorSystem(FLINK_ACTOR_SYSTEM_NAME, akkaConfig)
-  }
-
-  /**
-    * Creates an actor system with the given akka config.
-    *
-    * @param akkaConfig configuration for the actor system
-    * @return created actor system
-    */
-  def createActorSystem(actorSystemName: String, akkaConfig: Config): 
ActorSystem = {
     // Initialize slf4j as logger of Akka's Netty instead of java.util.logging 
(FLINK-1650)
     InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
-    ActorSystem.create(actorSystemName, akkaConfig)
+    ActorSystem.create("flink", akkaConfig)
   }
 
   /**
@@ -135,23 +119,7 @@ object AkkaUtils {
   }
 
   /**
-    * Returns a remote Akka config for the given configuration values.
-    *
-    * @param configuration containing the user provided configuration values
-    * @param hostname to bind against. If null, then the loopback interface is 
used
-    * @param port to bind against
-    * @param executorMode containing the user specified mode of executor
-    * @return A remote Akka config
-    */
-  def getAkkaConfig(configuration: Configuration,
-                    hostname: String,
-                    port: Int,
-                    executorConfig: Config): Config = {
-    getAkkaConfig(configuration, Some((hostname, port)), executorConfig)
-  }
-
-  /**
-    * Returns a remote Akka config for the given configuration values.
+    * Return a remote Akka config for the given configuration values.
     *
     * @param configuration containing the user provided configuration values
     * @param hostname to bind against. If null, then the loopback interface is 
used
@@ -187,25 +155,7 @@ object AkkaUtils {
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     externalAddress: Option[(String, Int)]): Config = {
-    getAkkaConfig(configuration, externalAddress, 
getForkJoinExecutorConfig(configuration))
-  }
-
-  /**
-    * Creates an akka config with the provided configuration values. If the 
listening address is
-    * specified, then the actor system will listen on the respective address.
-    *
-    * @param configuration instance containing the user provided configuration 
values
-    * @param externalAddress optional tuple of bindAddress and port to be 
reachable at.
-    *                        If None is given, then an Akka config for local 
actor system
-    *                        will be returned
-    * @param executorConfig config defining the used executor by the default 
dispatcher
-    * @return Akka config
-    */
-  @throws(classOf[UnknownHostException])
-  def getAkkaConfig(configuration: Configuration,
-                    externalAddress: Option[(String, Int)],
-                    executorConfig: Config): Config = {
-    val defaultConfig = 
getBasicAkkaConfig(configuration).withFallback(executorConfig)
+    val defaultConfig = getBasicAkkaConfig(configuration)
 
     externalAddress match {
 
@@ -257,6 +207,24 @@ object AkkaUtils {
     val supervisorStrategy = 
classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -283,6 +251,8 @@ object AkkaUtils {
         |
         |   default-dispatcher {
         |     throughput = $akkaThroughput
+        |
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -291,53 +261,6 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
-  def getThreadPoolExecutorConfig: Config = {
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "thread-pool-executor"
-       |      thread-pool-executor {
-       |        core-pool-size-min = 2
-       |        core-pool-size-factor = 2.0
-       |        core-pool-size-max = 4
-       |      }
-       |    }
-       |  }
-       |}
-        """.
-      stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
-  def getForkJoinExecutorConfig(configuration: Configuration): Config = {
-    val forkJoinExecutorParallelismFactor =
-      
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
-
-    val forkJoinExecutorParallelismMin =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
-
-    val forkJoinExecutorParallelismMax =
-      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
-
-    val configString = s"""
-       |akka {
-       |  actor {
-       |    default-dispatcher {
-       |      executor = "fork-join-executor"
-       |      fork-join-executor {
-       |        parallelism-factor = $forkJoinExecutorParallelismFactor
-       |        parallelism-min = $forkJoinExecutorParallelismMin
-       |        parallelism-max = $forkJoinExecutorParallelismMax
-       |      }
-       |    }
-       |  }
-       |}""".stripMargin
-
-    ConfigFactory.parseString(configString)
-  }
-
   def testDispatcherConfig: Config = {
     val config =
       s"""
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 3650f43066d..b113e12ef69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1494,8 +1494,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                assertTrue(pending.isDiscarded());
                assertTrue(savepointFuture.isDone());
 
-               // the now we should have a completed checkpoint
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               // the now the savepoint should be completed but not added to 
the completed checkpoint store
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
                // validate that the relevant tasks got a confirmation message
@@ -1510,7 +1510,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                        verify(subtaskState2, 
times(1)).registerSharedStates(any(SharedStateRegistry.class));
                }
 
-               CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
+               CompletedCheckpoint success = savepointFuture.get();
                assertEquals(jid, success.getJobId());
                assertEquals(timestamp, success.getTimestamp());
                assertEquals(pending.getCheckpointId(), 
success.getCheckpointID());
@@ -1528,9 +1528,9 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-               CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
+               CompletedCheckpoint successNew = savepointFuture.get();
                assertEquals(jid, successNew.getJobId());
                assertEquals(timestampNew, successNew.getTimestamp());
                assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1557,7 +1557,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws 
Exception {
         * Triggers a savepoint and two checkpoints. The second checkpoint 
completes
         * and subsumes the first checkpoint, but not the first savepoint. Then 
we
         * trigger another checkpoint and savepoint. The 2nd savepoint 
completes and
-        * subsumes the last checkpoint, but not the first savepoint.
+        * does neither subsume the last checkpoint nor the first savepoint.
         */
        @Test
        public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1614,18 +1614,19 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
                assertFalse(savepointFuture1.isDone());
 
                assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
+               long checkpointId3 = counter.getLast();
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
                CompletableFuture<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
                long savepointId2 = counter.getLast();
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-               // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
+               // 2nd savepoint should not subsume the last checkpoint and the 
1st savepoint
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2));
 
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(2, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
                assertFalse(savepointFuture1.isDone());
@@ -1635,9 +1636,15 @@ public void testSavepointsAreNotSubsumed() throws 
Exception {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1));
 
-               assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertTrue(savepointFuture1.isDone());
+
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId3));
+               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId3));
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
        }
 
        private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3460,6 +3467,92 @@ public void testCheckpointStatsTrackerRestoreCallback() 
throws Exception {
                        
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
        }
 
+       /**
+        * FLINK-6328
+        *
+        * Tests that savepoints are not added to the {@link 
CompletedCheckpointStore} and,
+        * thus, are not subject to job recovery. The reason that we don't want 
that (until
+        * FLINK-4815 has been finished) is that the lifecycle of savepoints is 
not controlled
+        * by the {@link CheckpointCoordinator}.
+        */
+       @Test
+       public void testSavepointsAreNotAddedToCompletedCheckpointStore() 
throws Exception {
+               final JobID jobId = new JobID();
+               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
+               final ExecutionVertex vertex1 = 
mockExecutionVertex(executionAttemptId);
+               final CompletedCheckpointStore completedCheckpointStore = new 
StandaloneCompletedCheckpointStore(1);
+               final long checkpointTimestamp1 = 1L;
+               final long savepointTimestamp = 2L;
+               final long checkpointTimestamp2 = 3L;
+               final String savepointDir = 
tmpFolder.newFolder().getAbsolutePath();
+
+               final StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
+
+               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinator(
+                       jobId,
+                       600000L,
+                       600000L,
+                       0L,
+                       Integer.MAX_VALUE,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       new ExecutionVertex[]{vertex1},
+                       new ExecutionVertex[]{vertex1},
+                       new ExecutionVertex[]{vertex1},
+                       checkpointIDCounter,
+                       completedCheckpointStore,
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
+
+               // trigger a first checkpoint
+               assertTrue(
+                       "Triggering of a checkpoint should work.",
+                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
+
+               assertTrue(0 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+               // complete the 1st checkpoint
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               checkpointIDCounter.getLast()));
+
+               // check that the checkpoint has been completed
+               assertTrue(1 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
+
+               // trigger a savepoint --> this should not have any effect on 
the CompletedCheckpointStore
+               CompletableFuture<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               checkpointIDCounter.getLast()));
+
+               // check that no errors occurred
+               final CompletedCheckpoint savepoint = savepointFuture.get();
+
+               assertFalse(
+                       "The savepoint should not have been added to the 
completed checkpoint store",
+                       savepoint.getCheckpointID() == 
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
+
+               assertTrue(
+                       "Triggering of a checkpoint should work.",
+                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
+
+               // complete the 2nd checkpoint
+               checkpointCoordinator.receiveAcknowledgeMessage(
+                       new AcknowledgeCheckpoint(
+                               jobId,
+                               executionAttemptId,
+                               checkpointIDCounter.getLast()));
+
+               assertTrue(
+                       "The latest completed (proper) checkpoint should have 
been added to the completed checkpoint store.",
+                       
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
+       }
+
        @Test
        public void testSharedStateRegistrationOnRestore() throws Exception {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 3ee1b927b53..6e0f9c5c638 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -165,7 +165,6 @@ public void testSlotAllocation() throws Exception {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        new BlobCacheService(
                                configuration,
                                new VoidBlobStore(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 179dea27835..4af052978f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -279,7 +279,6 @@ public void testHeartbeatTimeoutWithJobManager() throws 
Exception {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -370,7 +369,6 @@ public void testHeartbeatTimeoutWithResourceManager() 
throws Exception {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -486,7 +484,6 @@ public void testHeartbeatSlotReporting() throws Exception {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -565,7 +562,6 @@ public void testImmediatelyRegistersIfLeaderIsKnown() 
throws Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -629,7 +625,6 @@ public void testTriggerRegistrationOnLeaderChange() throws 
Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -752,7 +747,6 @@ public void testTaskSubmission() throws Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -852,7 +846,6 @@ public void testJobLeaderDetection() throws Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -884,7 +877,7 @@ public void testJobLeaderDetection() throws Exception {
                        // the job leader should get the allocation id offered
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                        any(ResourceID.class),
-                                       
(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
+                                       (Collection<SlotOffer>) 
Matchers.argThat(contains(slotOffer)),
                                        any(Time.class));
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskManager, timeout);
@@ -967,7 +960,6 @@ public void testSlotAcceptance() throws Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1062,7 +1054,6 @@ public void testSubmitTaskBeforeAcceptSlot() throws 
Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1169,7 +1160,6 @@ public void 
testFilterOutDuplicateJobMasterRegistrations() throws Exception {
                        taskManagerServices,
                        heartbeatServicesMock,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1243,7 +1233,6 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() 
throws Exception {
                        taskManagerServices,
                        heartbeatServices,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1300,7 +1289,6 @@ public void testRemoveJobFromJobLeaderService() throws 
Exception {
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1393,7 +1381,6 @@ public void 
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
                        taskManagerServices,
                        new HeartbeatServices(heartbeatInterval, 10L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1501,7 +1488,6 @@ public void 
testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
                                .build(),
                        new HeartbeatServices(heartbeatInterval, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
 
@@ -1713,7 +1699,6 @@ private TaskExecutor 
createTaskExecutor(TaskManagerServices taskManagerServices)
                        taskManagerServices,
                        new HeartbeatServices(1000L, 1000L),
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       null,
                        dummyBlobCacheService,
                        testingFatalErrorHandler);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 5fd12a84ac4..a9e99495e34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -34,7 +34,6 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
@@ -150,11 +149,6 @@ public void disconnectResourceManager(Exception cause) {
                return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
        }
 
-       @Override
-       public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(Time timeout) {
-               return 
CompletableFuture.completedFuture(SerializableOptional.of(address));
-       }
-
        @Override
        public String getAddress() {
                return address;
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index e5c1668df0a..d02a55483bf 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,30 +167,4 @@ class AkkaUtilsTest
     akkaConfig.getString("akka.remote.netty.tcp.hostname") should
       equal(NetUtils.unresolvedHostToNormalizedString(hostname))
   }
-
-  test("null hostname should go to localhost") {
-    val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 
1772)))
-
-    val hostname = configure.getString("akka.remote.netty.tcp.hostname")
-
-    InetAddress.getByName(hostname).isLoopbackAddress should be(true)
-  }
-
-  test("getAkkaConfig defaults to fork-join-executor") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(new Configuration())
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("fork-join-executor")
-  }
-
-  test("getAkkaConfig respects executor config") {
-    val akkaConfig = AkkaUtils.getAkkaConfig(
-      new Configuration(),
-      "localhost",
-      1234,
-      AkkaUtils.getThreadPoolExecutorConfig)
-
-    akkaConfig.getString("akka.actor.default-dispatcher.executor") should
-      equal("thread-pool-executor")
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to