[FLINK-6129] [metrics] Stop query actor of MetricRegistry

This PR properly shuts down the query actor of the MetricRegistry upon shut 
down.

Add locking to the MetricRegistry

This closes #3573.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8319a457
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8319a457
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8319a457

Branch: refs/heads/table-retraction
Commit: 8319a457d9adee310ef64905709c03ca2f2afd61
Parents: 3e860b4
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Mar 20 14:55:30 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Mar 23 15:13:33 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/metrics/MetricRegistry.java   | 157 +++++++++++++------
 .../runtime/metrics/MetricRegistryTest.java     |  34 ++++
 2 files changed, 142 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index d3b21fc..9f46d47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.metrics;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -34,8 +37,12 @@ import 
org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -52,7 +59,9 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class MetricRegistry {
        static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-       
+
+       private final Object lock = new Object();
+
        private List<MetricReporter> reporters;
        private ScheduledExecutorService executor;
        private ActorRef queryService;
@@ -150,10 +159,14 @@ public class MetricRegistry {
         * @param resourceID resource ID used to disambiguate the actor name
      */
        public void startQueryService(ActorSystem actorSystem, ResourceID 
resourceID) {
-               try {
-                       queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
-               } catch (Exception e) {
-                       LOG.warn("Could not start MetricDumpActor. No metrics 
will be submitted to the WebInterface.", e);
+               synchronized (lock) {
+                       Preconditions.checkState(!isShutdown(), "The metric 
registry has already been shut down.");
+
+                       try {
+                               queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+                       } catch (Exception e) {
+                               LOG.warn("Could not start MetricDumpActor. No 
metrics will be submitted to the WebInterface.", e);
+                       }
                }
        }
 
@@ -191,24 +204,51 @@ public class MetricRegistry {
         * @return true, if this registry was shutdown, otherwise false
         */
        public boolean isShutdown() {
-               return reporters == null && executor.isShutdown();
+               synchronized (lock) {
+                       return reporters == null && executor.isShutdown();
+               }
        }
 
        /**
         * Shuts down this registry and the associated {@link MetricReporter}.
         */
        public void shutdown() {
-               if (reporters != null) {
-                       for (MetricReporter reporter : reporters) {
+               synchronized (lock) {
+                       Future<Boolean> stopFuture = null;
+                       FiniteDuration stopTimeout = null;
+
+                       if (queryService != null) {
+                               stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
+                               stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+                       }
+
+                       if (reporters != null) {
+                               for (MetricReporter reporter : reporters) {
+                                       try {
+                                               reporter.close();
+                                       } catch (Throwable t) {
+                                               LOG.warn("Metrics reporter did 
not shut down cleanly", t);
+                                       }
+                               }
+                               reporters = null;
+                       }
+                       shutdownExecutor();
+
+                       if (stopFuture != null) {
+                               boolean stopped = false;
+
                                try {
-                                       reporter.close();
-                               } catch (Throwable t) {
-                                       LOG.warn("Metrics reporter did not shut 
down cleanly", t);
+                                       stopped = Await.result(stopFuture, 
stopTimeout);
+                               } catch (Exception e) {
+                                       LOG.warn("Query actor did not properly 
stop.", e);
+                               }
+
+                               if (!stopped) {
+                                       // the query actor did not stop in 
time, let's kill him
+                                       queryService.tell(Kill.getInstance(), 
ActorRef.noSender());
                                }
                        }
-                       reporters = null;
                }
-               shutdownExecutor();
        }
        
        private void shutdownExecutor() {
@@ -216,7 +256,7 @@ public class MetricRegistry {
                        executor.shutdown();
 
                        try {
-                               if (!executor.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                               if (!executor.awaitTermination(1L, 
TimeUnit.SECONDS)) {
                                        executor.shutdownNow();
                                }
                        } catch (InterruptedException e) {
@@ -241,27 +281,33 @@ public class MetricRegistry {
         * @param group       the group that contains the metric
         */
        public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               try {
-                       if (reporters != null) {
-                               for (int i = 0; i < reporters.size(); i++) {
-                                       MetricReporter reporter = 
reporters.get(i);
-                                       if (reporter != null) {
-                                               FrontMetricGroup front = new 
FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-                                               
reporter.notifyOfAddedMetric(metric, metricName, front);
+               synchronized (lock) {
+                       if (isShutdown()) {
+                               LOG.warn("Cannot register metric, because the 
MetricRegistry has already been shut down.");
+                       } else {
+                               try {
+                                       if (reporters != null) {
+                                               for (int i = 0; i < 
reporters.size(); i++) {
+                                                       MetricReporter reporter 
= reporters.get(i);
+                                                       if (reporter != null) {
+                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                                               
reporter.notifyOfAddedMetric(metric, metricName, front);
+                                                       }
+                                               }
                                        }
+                                       if (queryService != null) {
+                                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
+                                       }
+                                       if (metric instanceof View) {
+                                               if (viewUpdater == null) {
+                                                       viewUpdater = new 
ViewUpdater(executor);
+                                               }
+                                               
viewUpdater.notifyOfAddedView((View) metric);
+                                       }
+                               } catch (Exception e) {
+                                       LOG.error("Error while registering 
metric.", e);
                                }
                        }
-                       if (queryService != null) {
-                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
-                       }
-                       if (metric instanceof View) {
-                               if (viewUpdater == null) {
-                                       viewUpdater = new ViewUpdater(executor);
-                               }
-                               viewUpdater.notifyOfAddedView((View) metric);
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while registering metric.", e);
                }
        }
 
@@ -273,31 +319,44 @@ public class MetricRegistry {
         * @param group       the group that contains the metric
         */
        public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               try {
-                       if (reporters != null) {
-                               for (int i = 0; i < reporters.size(); i++) {
-                                       MetricReporter reporter = 
reporters.get(i);
-                                       if (reporter != null) {
-                                               FrontMetricGroup front = new 
FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-                                               
reporter.notifyOfRemovedMetric(metric, metricName, front);
+               synchronized (lock) {
+                       if (isShutdown()) {
+                               LOG.warn("Cannot unregister metric, because the 
MetricRegistry has already been shut down.");
+                       } else {
+                               try {
+                                       if (reporters != null) {
+                                               for (int i = 0; i < 
reporters.size(); i++) {
+                                                       MetricReporter reporter 
= reporters.get(i);
+                                                       if (reporter != null) {
+                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                                               
reporter.notifyOfRemovedMetric(metric, metricName, front);
+                                                       }
+                                               }
                                        }
+                                       if (queryService != null) {
+                                               
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+                                       }
+                                       if (metric instanceof View) {
+                                               if (viewUpdater != null) {
+                                                       
viewUpdater.notifyOfRemovedView((View) metric);
+                                               }
+                                       }
+                               } catch (Exception e) {
+                                       LOG.error("Error while registering 
metric.", e);
                                }
                        }
-                       if (queryService != null) {
-                               
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
-                       }
-                       if (metric instanceof View) {
-                               if (viewUpdater != null) {
-                                       viewUpdater.notifyOfRemovedView((View) 
metric);
-                               }
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while registering metric.", e);
                }
        }
 
        // 
------------------------------------------------------------------------
 
+       @VisibleForTesting
+       public ActorRef getQueryService() {
+               return queryService;
+       }
+
+       // 
------------------------------------------------------------------------
+
        /**
         * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
         * MetricsRegistry instance.

http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index ab4e7a4..fe29ccb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.metrics;
 
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
@@ -26,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.runtime.metrics.util.TestReporter;
@@ -33,11 +37,15 @@ import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class MetricRegistryTest extends TestLogger {
 
@@ -348,6 +356,32 @@ public class MetricRegistryTest extends TestLogger {
                assertEquals(4, 
TestReporter8.numCorrectDelimitersForUnregister);
        }
 
+       /**
+        * Tests that the query actor will be stopped when the MetricRegistry 
is shut down.
+        */
+       @Test
+       public void testQueryActorShutdown() throws Exception {
+               final FiniteDuration timeout = new FiniteDuration(10L, 
TimeUnit.SECONDS);
+
+               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+               final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+               registry.startQueryService(actorSystem, null);
+
+               ActorRef queryServiceActor = registry.getQueryService();
+
+               registry.shutdown();
+
+               try {
+                       
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout),
 timeout);
+
+                       fail("The query actor should be terminated resulting in 
a ActorNotFound exception.");
+               } catch (ActorNotFound e) {
+                       // we expect the query actor to be shut down
+               }
+       }
+
        public static class TestReporter8 extends TestReporter {
                char expectedDelimiter;
                public static int numCorrectDelimitersForRegister = 0;

Reply via email to