[FLINK-6129] [metrics] Stop query actor of MetricRegistry This PR properly shuts down the query actor of the MetricRegistry upon shut down.
Add locking to the MetricRegistry This closes #3573. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8319a457 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8319a457 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8319a457 Branch: refs/heads/table-retraction Commit: 8319a457d9adee310ef64905709c03ca2f2afd61 Parents: 3e860b4 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Mar 20 14:55:30 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Mar 23 15:13:33 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/metrics/MetricRegistry.java | 157 +++++++++++++------ .../runtime/metrics/MetricRegistryTest.java | 34 ++++ 2 files changed, 142 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index d3b21fc..9f46d47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.metrics; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.pattern.Patterns; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -34,8 +37,12 @@ import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; @@ -52,7 +59,9 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class MetricRegistry { static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); - + + private final Object lock = new Object(); + private List<MetricReporter> reporters; private ScheduledExecutorService executor; private ActorRef queryService; @@ -150,10 +159,14 @@ public class MetricRegistry { * @param resourceID resource ID used to disambiguate the actor name */ public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) { - try { - queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); - } catch (Exception e) { - LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); + synchronized (lock) { + Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down."); + + try { + queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID); + } catch (Exception e) { + LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); + } } } @@ -191,24 +204,51 @@ public class MetricRegistry { * @return true, if this registry was shutdown, otherwise false */ public boolean isShutdown() { - return reporters == null && executor.isShutdown(); + synchronized (lock) { + return reporters == null && executor.isShutdown(); + } } /** * Shuts down this registry and the associated {@link MetricReporter}. */ public void shutdown() { - if (reporters != null) { - for (MetricReporter reporter : reporters) { + synchronized (lock) { + Future<Boolean> stopFuture = null; + FiniteDuration stopTimeout = null; + + if (queryService != null) { + stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } + + if (reporters != null) { + for (MetricReporter reporter : reporters) { + try { + reporter.close(); + } catch (Throwable t) { + LOG.warn("Metrics reporter did not shut down cleanly", t); + } + } + reporters = null; + } + shutdownExecutor(); + + if (stopFuture != null) { + boolean stopped = false; + try { - reporter.close(); - } catch (Throwable t) { - LOG.warn("Metrics reporter did not shut down cleanly", t); + stopped = Await.result(stopFuture, stopTimeout); + } catch (Exception e) { + LOG.warn("Query actor did not properly stop.", e); + } + + if (!stopped) { + // the query actor did not stop in time, let's kill him + queryService.tell(Kill.getInstance(), ActorRef.noSender()); } } - reporters = null; } - shutdownExecutor(); } private void shutdownExecutor() { @@ -216,7 +256,7 @@ public class MetricRegistry { executor.shutdown(); try { - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { @@ -241,27 +281,33 @@ public class MetricRegistry { * @param group the group that contains the metric */ public void register(Metric metric, String metricName, AbstractMetricGroup group) { - try { - if (reporters != null) { - for (int i = 0; i < reporters.size(); i++) { - MetricReporter reporter = reporters.get(i); - if (reporter != null) { - FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); - reporter.notifyOfAddedMetric(metric, metricName, front); + synchronized (lock) { + if (isShutdown()) { + LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down."); + } else { + try { + if (reporters != null) { + for (int i = 0; i < reporters.size(); i++) { + MetricReporter reporter = reporters.get(i); + if (reporter != null) { + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfAddedMetric(metric, metricName, front); + } + } } + if (queryService != null) { + MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); + } + if (metric instanceof View) { + if (viewUpdater == null) { + viewUpdater = new ViewUpdater(executor); + } + viewUpdater.notifyOfAddedView((View) metric); + } + } catch (Exception e) { + LOG.error("Error while registering metric.", e); } } - if (queryService != null) { - MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); - } - if (metric instanceof View) { - if (viewUpdater == null) { - viewUpdater = new ViewUpdater(executor); - } - viewUpdater.notifyOfAddedView((View) metric); - } - } catch (Exception e) { - LOG.error("Error while registering metric.", e); } } @@ -273,31 +319,44 @@ public class MetricRegistry { * @param group the group that contains the metric */ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { - try { - if (reporters != null) { - for (int i = 0; i < reporters.size(); i++) { - MetricReporter reporter = reporters.get(i); - if (reporter != null) { - FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); - reporter.notifyOfRemovedMetric(metric, metricName, front); + synchronized (lock) { + if (isShutdown()) { + LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down."); + } else { + try { + if (reporters != null) { + for (int i = 0; i < reporters.size(); i++) { + MetricReporter reporter = reporters.get(i); + if (reporter != null) { + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfRemovedMetric(metric, metricName, front); + } + } } + if (queryService != null) { + MetricQueryService.notifyOfRemovedMetric(queryService, metric); + } + if (metric instanceof View) { + if (viewUpdater != null) { + viewUpdater.notifyOfRemovedView((View) metric); + } + } + } catch (Exception e) { + LOG.error("Error while registering metric.", e); } } - if (queryService != null) { - MetricQueryService.notifyOfRemovedMetric(queryService, metric); - } - if (metric instanceof View) { - if (viewUpdater != null) { - viewUpdater.notifyOfRemovedView((View) metric); - } - } - } catch (Exception e) { - LOG.error("Error while registering metric.", e); } } // ------------------------------------------------------------------------ + @VisibleForTesting + public ActorRef getQueryService() { + return queryService; + } + + // ------------------------------------------------------------------------ + /** * This task is explicitly a static class, so that it does not hold any references to the enclosing * MetricsRegistry instance. http://git-wip-us.apache.org/repos/asf/flink/blob/8319a457/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index ab4e7a4..fe29ccb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.metrics; +import akka.actor.ActorNotFound; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; @@ -26,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.metrics.util.TestReporter; @@ -33,11 +37,15 @@ import org.apache.flink.runtime.metrics.util.TestReporter; import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class MetricRegistryTest extends TestLogger { @@ -348,6 +356,32 @@ public class MetricRegistryTest extends TestLogger { assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); } + /** + * Tests that the query actor will be stopped when the MetricRegistry is shut down. + */ + @Test + public void testQueryActorShutdown() throws Exception { + final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + + final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + + registry.startQueryService(actorSystem, null); + + ActorRef queryServiceActor = registry.getQueryService(); + + registry.shutdown(); + + try { + Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); + + fail("The query actor should be terminated resulting in a ActorNotFound exception."); + } catch (ActorNotFound e) { + // we expect the query actor to be shut down + } + } + public static class TestReporter8 extends TestReporter { char expectedDelimiter; public static int numCorrectDelimitersForRegister = 0;