Repository: flink
Updated Branches:
  refs/heads/master 98d182603 -> f7f7b487b


[FLINK-5179] [metrics] Close TaskManagerMetricGroup on JobManager dissociation

This closes #2886.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5f4e3d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5f4e3d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5f4e3d8

Branch: refs/heads/master
Commit: e5f4e3d8aa330ebb88166a668b59b8d1730cb618
Parents: 98d1826
Author: zentol <ches...@apache.org>
Authored: Mon Nov 28 16:10:33 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Dec 6 14:03:08 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/metrics/MetricRegistry.java   |   9 ++
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 .../runtime/metrics/MetricRegistryTest.java     |  11 ++
 .../runtime/metrics/TaskManagerMetricsTest.java | 133 +++++++++++++++++++
 4 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index c17fdb4..f4510db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -185,6 +185,15 @@ public class MetricRegistry {
        }
 
        /**
+        * Returns whether this registry has been shutdown.
+        *
+        * @return true, if this registry was shutdown, otherwise false
+        */
+       public boolean isShutdown() {
+               return reporters == null && executor.isShutdown();
+       }
+
+       /**
         * Shuts down this registry and the associated {@link MetricReporter}.
         */
        public void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3b1382..271578f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1046,11 +1046,10 @@ class TaskManager(
       network.getKvStateRegistry.unregisterListener()
     }
     
-    // failsafe shutdown of the metrics registry
     try {
-      metricsRegistry.shutdown()
+      taskManagerMetricGroup.close()
     } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
+      case t: Exception => log.warn("TaskManagerMetricGroup could not be 
closed successfully.", t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 1157215..ab4e7a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -42,6 +42,17 @@ import static org.junit.Assert.assertTrue;
 public class MetricRegistryTest extends TestLogger {
 
        private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+       @Test
+       public void testIsShutdown() {
+               MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+               
+               Assert.assertFalse(metricRegistry.isShutdown());
+               
+               metricRegistry.shutdown();
+               
+               Assert.assertTrue(metricRegistry.isShutdown());
+       }
        
        /**
         * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/e5f4e3d8/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
new file mode 100644
index 0000000..ad3de33
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Option;
+import scala.Tuple7;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class TaskManagerMetricsTest {
+
+       /**
+        * Tests the metric registry life cycle on JobManager re-connects.
+        */
+       @Test
+       public void testMetricRegistryLifeCycle() {
+               ActorSystem actorSystem = null;
+               try {
+                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+                       // 
================================================================
+                       // Start JobManager
+                       // 
================================================================
+                       final ActorRef jobManager = 
JobManager.startJobManagerActors(
+                               new Configuration(),
+                               actorSystem,
+                               actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
+                               JobManager.class,
+                               MemoryArchivist.class)._1();
+
+                       LeaderRetrievalService leaderRetrievalService = new 
StandaloneLeaderRetrievalService(jobManager.path().toString());
+
+                       // 
================================================================
+                       // Start TaskManager
+                       // 
================================================================
+                       ResourceID tmResourceID = ResourceID.generate();
+
+                       Tuple7<TaskManagerConfiguration, TaskManagerLocation, 
MemoryManager, IOManager, NetworkEnvironment, LeaderRetrievalService, 
MetricRegistry> components =
+                               TaskManager.createTaskManagerComponents(
+                                       new Configuration(),
+                                       tmResourceID,
+                                       "localhost",
+                                       true,
+                                       Option.apply(leaderRetrievalService)
+                               );
+
+                       // create the task manager
+                       final Props tmProps = TaskManager.getTaskManagerProps(
+                               TaskManager.class,
+                               components._1(),
+                               tmResourceID,
+                               components._2(),
+                               components._3(),
+                               components._4(),
+                               components._5(),
+                               components._6(),
+                               components._7());
+
+                       final ActorRef taskManager = 
actorSystem.actorOf(tmProps);
+
+                       new JavaTestKit(actorSystem) {{
+                               new Within(new FiniteDuration(5000, 
TimeUnit.SECONDS)) {
+                                       @Override
+                                       protected void run() {
+                                               
taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+                                                       getTestActor());
+
+                                               // wait for the TM to be 
registered
+                                               
expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+
+                                               // trigger re-registration of 
TM; this should include a disconnect from the current JM
+                                               taskManager.tell(new 
TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), 
null), jobManager);
+
+                                               // wait for re-registration to 
be completed
+                                               
taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+                                                       getTestActor());
+
+                                               
expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
+                                       }
+                               };
+                       }};
+
+                       // verify that the registry was not shutdown due to the 
disconnect
+                       MetricRegistry tmRegistry = components._7();
+                       Assert.assertFalse(tmRegistry.isShutdown());
+
+                       // shut down the actors and the actor system
+                       actorSystem.shutdown();
+                       actorSystem.awaitTermination();
+               } finally {
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+               }
+       }
+}

Reply via email to