http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 1a3ca70..d8e65a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -45,7 +45,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; @@ -502,7 +502,7 @@ public class ResourceManagerTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - final MetricRegistry metricRegistry = mock(MetricRegistry.class); + final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final SlotManager slotManager = new SlotManager( @@ -601,7 +601,7 @@ public class ResourceManagerTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - final MetricRegistry metricRegistry = mock(MetricRegistry.class); + final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index d1ca757..8558145 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -114,7 +114,7 @@ public class DispatcherTest extends TestLogger { mock(ResourceManagerGateway.class), mock(BlobServer.class), heartbeatServices, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), fatalErrorHandler, jobManagerRunner, jobId); @@ -174,7 +174,7 @@ public class DispatcherTest extends TestLogger { mock(ResourceManagerGateway.class), mock(BlobServer.class), heartbeatServices, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), fatalErrorHandler, mock(JobManagerRunner.class), jobId); @@ -209,7 +209,7 @@ public class DispatcherTest extends TestLogger { ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, FatalErrorHandler fatalErrorHandler, JobManagerRunner jobManagerRunner, JobID expectedJobId) throws Exception { @@ -238,7 +238,7 @@ public class DispatcherTest extends TestLogger { HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 7df26fc..d843da2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -62,7 +62,9 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -206,7 +208,7 @@ public class JobManagerHARecoveryTest extends TestLogger { mySubmittedJobGraphStore, checkpointStateFactory, jobRecoveryTimeout, - Option.<MetricRegistry>empty(), + Option.<MetricRegistryImpl>empty(), Option.<String>empty()); jobManager = system.actorOf(jobManagerProps); @@ -217,6 +219,7 @@ public class JobManagerHARecoveryTest extends TestLogger { ResourceID.generate(), system, testingHighAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("taskmanager"), true, @@ -380,7 +383,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphStore, mock(CheckpointRecoveryFactory.class), jobRecoveryTimeout, - Option.<MetricRegistry>apply(null), + Option.<MetricRegistryImpl>apply(null), recoveredJobs).withDispatcher(CallingThreadDispatcher.Id()); jobManager = system.actorOf(jobManagerProps); @@ -418,7 +421,7 @@ public class JobManagerHARecoveryTest extends TestLogger { SubmittedJobGraphStore submittedJobGraphs, CheckpointRecoveryFactory checkpointRecoveryFactory, FiniteDuration jobRecoveryTimeout, - Option<MetricRegistry> metricsRegistry, + JobManagerMetricGroup jobManagerMetricGroup, Collection<JobID> recoveredJobs) { super( flinkConfiguration, @@ -435,7 +438,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, Option.empty()); this.recoveredJobs = recoveredJobs; http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index bd7f11f..a697aae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -75,6 +75,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered; @@ -624,6 +625,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), TestingJobManager.class, MemoryArchivist.class)._1(); @@ -645,6 +647,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), system, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", scala.Option.<String>empty(), true, @@ -841,6 +844,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -859,6 +863,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, @@ -1051,6 +1056,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1069,6 +1075,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, @@ -1164,6 +1171,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1182,6 +1190,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, @@ -1275,6 +1284,7 @@ public class JobManagerTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -1296,6 +1306,7 @@ public class JobManagerTest extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 340a735..cc93f18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -94,6 +95,7 @@ public class JobSubmitTest { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index b4f50fb..083d6e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -115,7 +115,7 @@ public class JobManagerRunnerMockTest extends TestLogger { haServices, heartbeatServices, JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)), - new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()), + new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()), jobCompletion, jobCompletion)); } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index c3b57fa..e4ceb40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -28,7 +28,6 @@ import akka.util.Timeout; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.BlobServerOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; @@ -43,7 +42,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -199,7 +198,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { submittedJobGraphStore, checkpointRecoveryFactory, AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - Option.<MetricRegistry>empty(), + Option.<MetricRegistryImpl>empty(), Option.<String>empty()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java new file mode 100644 index 0000000..b0b20b2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.metrics.groups.MetricGroupTest; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.runtime.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorNotFound; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.Await; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link MetricRegistryImpl}. + */ +public class MetricRegistryImplTest extends TestLogger { + + private static final char GLOBAL_DEFAULT_DELIMITER = '.'; + + @Test + public void testIsShutdown() { + MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + + Assert.assertFalse(metricRegistry.isShutdown()); + + metricRegistry.shutdown(); + + Assert.assertTrue(metricRegistry.isShutdown()); + } + + /** + * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. + */ + @Test + public void testReporterInstantiation() { + Configuration config = new Configuration(); + + config.setString(MetricOptions.REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + + MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + assertTrue(metricRegistry.getReporters().size() == 1); + + Assert.assertTrue(TestReporter1.wasOpened); + + metricRegistry.shutdown(); + } + + /** + * Reporter that exposes whether open() was called. + */ + protected static class TestReporter1 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** + * Verifies that multiple reporters are instantiated correctly. + */ + @Test + public void testMultipleReporterInstantiation() { + Configuration config = new Configuration(); + + config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); + + MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + assertTrue(metricRegistry.getReporters().size() == 3); + + Assert.assertTrue(TestReporter11.wasOpened); + Assert.assertTrue(TestReporter12.wasOpened); + Assert.assertTrue(TestReporter13.wasOpened); + + metricRegistry.shutdown(); + } + + /** + * Reporter that exposes whether open() was called. + */ + protected static class TestReporter11 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** + * Reporter that exposes whether open() was called. + */ + protected static class TestReporter12 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** + * Reporter that exposes whether open() was called. + */ + protected static class TestReporter13 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** + * Verifies that configured arguments are properly forwarded to the reporter. + */ + @Test + public void testReporterArgumentForwarding() { + Configuration config = new Configuration(); + + config.setString(MetricOptions.REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); + + new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown(); + + Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null)); + Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null)); + } + + /** + * Reporter that exposes the {@link MetricConfig} it was given. + */ + protected static class TestReporter2 extends TestReporter { + static MetricConfig mc; + @Override + public void open(MetricConfig config) { + mc = config; + } + } + + /** + * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. + * + * @throws InterruptedException + */ + @Test + public void testReporterScheduling() throws InterruptedException { + Configuration config = new Configuration(); + + config.setString(MetricOptions.REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + long start = System.currentTimeMillis(); + + // only start counting from now on + TestReporter3.reportCount = 0; + + for (int x = 0; x < 10; x++) { + Thread.sleep(100); + int reportCount = TestReporter3.reportCount; + long curT = System.currentTimeMillis(); + /** + * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. + * This value however does not not take the first triggered report into account (=> +1). + * Furthermore we have to account for the mis-alignment between reports being triggered and our time + * measurement (=> +1); for T=200 a total of 4-6 reports may have been + * triggered depending on whether the end of the interval for the first reports ends before + * or after T=50. + */ + long maxAllowedReports = (curT - start) / 50 + 2; + Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount); + } + Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); + + registry.shutdown(); + } + + /** + * Reporter that exposes how often report() was called. + */ + protected static class TestReporter3 extends TestReporter implements Scheduled { + public static int reportCount = 0; + + @Override + public void report() { + reportCount++; + } + } + + /** + * Verifies that reporters are notified of added/removed metrics. + */ + @Test + public void testReporterNotifications() { + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + root.counter("rootCounter"); + + assertTrue(TestReporter6.addedMetric instanceof Counter); + assertEquals("rootCounter", TestReporter6.addedMetricName); + + assertTrue(TestReporter7.addedMetric instanceof Counter); + assertEquals("rootCounter", TestReporter7.addedMetricName); + + root.close(); + + assertTrue(TestReporter6.removedMetric instanceof Counter); + assertEquals("rootCounter", TestReporter6.removedMetricName); + + assertTrue(TestReporter7.removedMetric instanceof Counter); + assertEquals("rootCounter", TestReporter7.removedMetricName); + + registry.shutdown(); + } + + /** + * Reporter that exposes the name and metric instance of the last metric that was added or removed. + */ + protected static class TestReporter6 extends TestReporter { + static Metric addedMetric; + static String addedMetricName; + + static Metric removedMetric; + static String removedMetricName; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + addedMetric = metric; + addedMetricName = metricName; + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + removedMetric = metric; + removedMetricName = metricName; + } + } + + /** + * Reporter that exposes the name and metric instance of the last metric that was added or removed. + */ + protected static class TestReporter7 extends TestReporter { + static Metric addedMetric; + static String addedMetricName; + + static Metric removedMetric; + static String removedMetricName; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + addedMetric = metric; + addedMetricName = metricName; + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + removedMetric = metric; + removedMetricName = metricName; + } + } + + /** + * Verifies that the scope configuration is properly extracted. + */ + @Test + public void testScopeConfig() { + Configuration config = new Configuration(); + + config.setString(MetricOptions.SCOPE_NAMING_TM, "A"); + config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B"); + config.setString(MetricOptions.SCOPE_NAMING_TASK, "C"); + config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D"); + + ScopeFormats scopeConfig = ScopeFormats.fromConfig(config); + + assertEquals("A", scopeConfig.getTaskManagerFormat().format()); + assertEquals("B", scopeConfig.getTaskManagerJobFormat().format()); + assertEquals("C", scopeConfig.getTaskFormat().format()); + assertEquals("D", scopeConfig.getOperatorFormat().format()); + } + + @Test + public void testConfigurableDelimiter() { + Configuration config = new Configuration(); + config.setString(MetricOptions.SCOPE_DELIMITER, "_"); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); + assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); + + registry.shutdown(); + } + + @Test + public void testConfigurableDelimiterForReporters() { + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter()); + assertEquals('_', registry.getDelimiter(0)); + assertEquals('-', registry.getDelimiter(1)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1)); + + registry.shutdown(); + } + + @Test + public void testConfigurableDelimiterForReportersInGroup() { + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B"); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + List<MetricReporter> reporters = registry.getReporters(); + ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter + ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter + ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter + ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter + + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); + group.counter("C"); + group.close(); + registry.shutdown(); + assertEquals(4, TestReporter8.numCorrectDelimitersForRegister); + assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); + } + + /** + * Tests that the query actor will be stopped when the MetricRegistry is shut down. + */ + @Test + public void testQueryActorShutdown() throws Exception { + final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + + final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + + registry.startQueryService(actorSystem, null); + + ActorRef queryServiceActor = registry.getQueryService(); + + registry.shutdown(); + + try { + Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); + + fail("The query actor should be terminated resulting in a ActorNotFound exception."); + } catch (ActorNotFound e) { + // we expect the query actor to be shut down + } + } + + /** + * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier. + */ + public static class TestReporter8 extends TestReporter { + char expectedDelimiter; + public static int numCorrectDelimitersForRegister = 0; + public static int numCorrectDelimitersForUnregister = 0; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; + assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); + assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); + numCorrectDelimitersForRegister++; + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; + assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); + assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); + numCorrectDelimitersForUnregister++; + } + } + + @Test + public void testExceptionIsolation() throws Exception { + + Configuration config = new Configuration(); + config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); + + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + + Counter metric = new SimpleCounter(); + registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry)); + + assertEquals(metric, TestReporter7.addedMetric); + assertEquals("counter", TestReporter7.addedMetricName); + + registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry)); + + assertEquals(metric, TestReporter7.removedMetric); + assertEquals("counter", TestReporter7.removedMetricName); + + registry.shutdown(); + } + + /** + * Reporter that throws an exception when it is notified of an added or removed metric. + */ + protected static class FailingReporter extends TestReporter { + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + throw new RuntimeException(); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + throw new RuntimeException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java deleted file mode 100644 index 284b86a..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ /dev/null @@ -1,496 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.metrics; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricConfig; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.metrics.reporter.Scheduled; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.metrics.groups.MetricGroupTest; -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.metrics.scope.ScopeFormats; -import org.apache.flink.runtime.metrics.util.TestReporter; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorNotFound; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.Await; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the {@link MetricRegistry}. - */ -public class MetricRegistryTest extends TestLogger { - - private static final char GLOBAL_DEFAULT_DELIMITER = '.'; - - @Test - public void testIsShutdown() { - MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - - Assert.assertFalse(metricRegistry.isShutdown()); - - metricRegistry.shutdown(); - - Assert.assertTrue(metricRegistry.isShutdown()); - } - - /** - * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. - */ - @Test - public void testReporterInstantiation() { - Configuration config = new Configuration(); - - config.setString(MetricOptions.REPORTERS_LIST, "test"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); - - MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - assertTrue(metricRegistry.getReporters().size() == 1); - - Assert.assertTrue(TestReporter1.wasOpened); - - metricRegistry.shutdown(); - } - - /** - * Reporter that exposes whether open() was called. - */ - protected static class TestReporter1 extends TestReporter { - public static boolean wasOpened = false; - - @Override - public void open(MetricConfig config) { - wasOpened = true; - } - } - - /** - * Verifies that multiple reporters are instantiated correctly. - */ - @Test - public void testMultipleReporterInstantiation() { - Configuration config = new Configuration(); - - config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); - - MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - assertTrue(metricRegistry.getReporters().size() == 3); - - Assert.assertTrue(TestReporter11.wasOpened); - Assert.assertTrue(TestReporter12.wasOpened); - Assert.assertTrue(TestReporter13.wasOpened); - - metricRegistry.shutdown(); - } - - /** - * Reporter that exposes whether open() was called. - */ - protected static class TestReporter11 extends TestReporter { - public static boolean wasOpened = false; - - @Override - public void open(MetricConfig config) { - wasOpened = true; - } - } - - /** - * Reporter that exposes whether open() was called. - */ - protected static class TestReporter12 extends TestReporter { - public static boolean wasOpened = false; - - @Override - public void open(MetricConfig config) { - wasOpened = true; - } - } - - /** - * Reporter that exposes whether open() was called. - */ - protected static class TestReporter13 extends TestReporter { - public static boolean wasOpened = false; - - @Override - public void open(MetricConfig config) { - wasOpened = true; - } - } - - /** - * Verifies that configured arguments are properly forwarded to the reporter. - */ - @Test - public void testReporterArgumentForwarding() { - Configuration config = new Configuration(); - - config.setString(MetricOptions.REPORTERS_LIST, "test"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); - - new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown(); - - Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null)); - Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null)); - } - - /** - * Reporter that exposes the {@link MetricConfig} it was given. - */ - protected static class TestReporter2 extends TestReporter { - static MetricConfig mc; - @Override - public void open(MetricConfig config) { - mc = config; - } - } - - /** - * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. - * - * @throws InterruptedException - */ - @Test - public void testReporterScheduling() throws InterruptedException { - Configuration config = new Configuration(); - - config.setString(MetricOptions.REPORTERS_LIST, "test"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS"); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - long start = System.currentTimeMillis(); - - // only start counting from now on - TestReporter3.reportCount = 0; - - for (int x = 0; x < 10; x++) { - Thread.sleep(100); - int reportCount = TestReporter3.reportCount; - long curT = System.currentTimeMillis(); - /** - * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. - * This value however does not not take the first triggered report into account (=> +1). - * Furthermore we have to account for the mis-alignment between reports being triggered and our time - * measurement (=> +1); for T=200 a total of 4-6 reports may have been - * triggered depending on whether the end of the interval for the first reports ends before - * or after T=50. - */ - long maxAllowedReports = (curT - start) / 50 + 2; - Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount); - } - Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); - - registry.shutdown(); - } - - /** - * Reporter that exposes how often report() was called. - */ - protected static class TestReporter3 extends TestReporter implements Scheduled { - public static int reportCount = 0; - - @Override - public void report() { - reportCount++; - } - } - - /** - * Verifies that reporters are notified of added/removed metrics. - */ - @Test - public void testReporterNotifications() { - Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); - root.counter("rootCounter"); - - assertTrue(TestReporter6.addedMetric instanceof Counter); - assertEquals("rootCounter", TestReporter6.addedMetricName); - - assertTrue(TestReporter7.addedMetric instanceof Counter); - assertEquals("rootCounter", TestReporter7.addedMetricName); - - root.close(); - - assertTrue(TestReporter6.removedMetric instanceof Counter); - assertEquals("rootCounter", TestReporter6.removedMetricName); - - assertTrue(TestReporter7.removedMetric instanceof Counter); - assertEquals("rootCounter", TestReporter7.removedMetricName); - - registry.shutdown(); - } - - /** - * Reporter that exposes the name and metric instance of the last metric that was added or removed. - */ - protected static class TestReporter6 extends TestReporter { - static Metric addedMetric; - static String addedMetricName; - - static Metric removedMetric; - static String removedMetricName; - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - addedMetric = metric; - addedMetricName = metricName; - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { - removedMetric = metric; - removedMetricName = metricName; - } - } - - /** - * Reporter that exposes the name and metric instance of the last metric that was added or removed. - */ - protected static class TestReporter7 extends TestReporter { - static Metric addedMetric; - static String addedMetricName; - - static Metric removedMetric; - static String removedMetricName; - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - addedMetric = metric; - addedMetricName = metricName; - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { - removedMetric = metric; - removedMetricName = metricName; - } - } - - /** - * Verifies that the scope configuration is properly extracted. - */ - @Test - public void testScopeConfig() { - Configuration config = new Configuration(); - - config.setString(MetricOptions.SCOPE_NAMING_TM, "A"); - config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B"); - config.setString(MetricOptions.SCOPE_NAMING_TASK, "C"); - config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D"); - - ScopeFormats scopeConfig = ScopeFormats.fromConfig(config); - - assertEquals("A", scopeConfig.getTaskManagerFormat().format()); - assertEquals("B", scopeConfig.getTaskManagerJobFormat().format()); - assertEquals("C", scopeConfig.getTaskFormat().format()); - assertEquals("D", scopeConfig.getOperatorFormat().format()); - } - - @Test - public void testConfigurableDelimiter() { - Configuration config = new Configuration(); - config.setString(MetricOptions.SCOPE_DELIMITER, "_"); - config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); - assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); - - registry.shutdown(); - } - - @Test - public void testConfigurableDelimiterForReporters() { - Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter()); - assertEquals('_', registry.getDelimiter(0)); - assertEquals('-', registry.getDelimiter(1)); - assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2)); - assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3)); - assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1)); - - registry.shutdown(); - } - - @Test - public void testConfigurableDelimiterForReportersInGroup() { - Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); - config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B"); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - List<MetricReporter> reporters = registry.getReporters(); - ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter - ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter - ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter - ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter - - TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); - group.counter("C"); - group.close(); - registry.shutdown(); - assertEquals(4, TestReporter8.numCorrectDelimitersForRegister); - assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); - } - - /** - * Tests that the query actor will be stopped when the MetricRegistry is shut down. - */ - @Test - public void testQueryActorShutdown() throws Exception { - final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - - final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - - registry.startQueryService(actorSystem, null); - - ActorRef queryServiceActor = registry.getQueryService(); - - registry.shutdown(); - - try { - Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); - - fail("The query actor should be terminated resulting in a ActorNotFound exception."); - } catch (ActorNotFound e) { - // we expect the query actor to be shut down - } - } - - /** - * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier. - */ - public static class TestReporter8 extends TestReporter { - char expectedDelimiter; - public static int numCorrectDelimitersForRegister = 0; - public static int numCorrectDelimitersForUnregister = 0; - - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; - assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); - assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); - numCorrectDelimitersForRegister++; - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { - String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; - assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); - assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); - numCorrectDelimitersForUnregister++; - } - } - - @Test - public void testExceptionIsolation() throws Exception { - - Configuration config = new Configuration(); - config.setString(MetricOptions.REPORTERS_LIST, "test1,test2"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName()); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); - - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); - - Counter metric = new SimpleCounter(); - registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry)); - - assertEquals(metric, TestReporter7.addedMetric); - assertEquals("counter", TestReporter7.addedMetricName); - - registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry)); - - assertEquals(metric, TestReporter7.removedMetric); - assertEquals("counter", TestReporter7.removedMetricName); - - registry.shutdown(); - } - - /** - * Reporter that throws an exception when it is notified of an added or removed metric. - */ - protected static class FailingReporter extends TestReporter { - @Override - public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { - throw new RuntimeException(); - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { - throw new RuntimeException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java new file mode 100644 index 0000000..1140e3d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; + +/** + * Metric registry which does nothing and is intended for testing purposes. + */ +public class NoOpMetricRegistry implements MetricRegistry { + + final char delimiter = ','; + + final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration()); + + @Override + public char getDelimiter() { + return delimiter; + } + + @Override + public char getDelimiter(int index) { + return delimiter; + } + + @Override + public int getNumberReporters() { + return 0; + } + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) {} + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {} + + @Override + public ScopeFormats getScopeFormats() { + return scopeFormats; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index cea0928..31304e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -61,6 +61,9 @@ public class TaskManagerMetricsTest extends TestLogger { HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor()); + final MetricRegistryImpl metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(new Configuration())); + try { actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); @@ -73,6 +76,7 @@ public class TaskManagerMetricsTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1(); @@ -89,9 +93,9 @@ public class TaskManagerMetricsTest extends TestLogger { TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( - taskManagerServicesConfiguration, tmResourceID); - - final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry(); + taskManagerServicesConfiguration, + tmResourceID, + metricRegistry); // create the task manager final Props tmProps = TaskManager.getTaskManagerProps( @@ -103,7 +107,7 @@ public class TaskManagerMetricsTest extends TestLogger { taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, - tmRegistry); + taskManagerServices.getTaskManagerMetricGroup()); final ActorRef taskManager = actorSystem.actorOf(tmProps); @@ -135,7 +139,7 @@ public class TaskManagerMetricsTest extends TestLogger { }}; // verify that the registry was not shutdown due to the disconnect - Assert.assertFalse(tmRegistry.isShutdown()); + Assert.assertFalse(metricRegistry.isShutdown()); // shut down the actors and the actor system actorSystem.shutdown(); @@ -148,6 +152,8 @@ public class TaskManagerMetricsTest extends TestLogger { if (highAvailabilityServices != null) { highAvailabilityServices.closeAndCleanupAllData(); } + + metricRegistry.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 5c33ad6..55ba3a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -25,8 +25,8 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestingHistogram; import org.apache.flink.util.TestLogger; @@ -82,7 +82,7 @@ public class MetricQueryServiceTest extends TestLogger { } }; - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 648ee47..8d91b81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -25,8 +25,8 @@ import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.TestReporter; @@ -45,7 +45,7 @@ public class AbstractMetricGroupTest { */ @Test public void testGetAllVariables() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) { @Override @@ -90,7 +90,7 @@ public class AbstractMetricGroupTest { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!"); - MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); try { MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id"); tmGroup.counter("1"); @@ -180,7 +180,7 @@ public class AbstractMetricGroupTest { public void testScopeGenerationWithoutReporters() { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); - MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); try { TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id"); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 03341a6..05a72ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; @@ -45,7 +45,7 @@ public class JobManagerGroupTest extends TestLogger { @Test public void addAndRemoveJobs() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); final JobID jid1 = new JobID(); @@ -77,7 +77,7 @@ public class JobManagerGroupTest extends TestLogger { @Test public void testCloseClosesAll() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); final JobID jid1 = new JobID(); @@ -103,7 +103,7 @@ public class JobManagerGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); @@ -116,7 +116,7 @@ public class JobManagerGroupTest extends TestLogger { public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host"); @@ -128,7 +128,7 @@ public class JobManagerGroupTest extends TestLogger { @Test public void testCreateQueryServiceMetricInfo() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter()); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index d734dfd..4373f80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; @@ -39,7 +39,7 @@ public class JobManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -60,7 +60,7 @@ public class JobManagerJobGroupTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -83,7 +83,7 @@ public class JobManagerJobGroupTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -104,7 +104,7 @@ public class JobManagerJobGroupTest extends TestLogger { @Test public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname"); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index 56ce5fa..324bb73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -27,9 +27,10 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.util.TestReporter; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; @@ -39,7 +40,7 @@ import static org.junit.Assert.assertEquals; /** * Tests for the registration of groups and metrics on a {@link MetricGroup}. */ -public class MetricGroupRegistrationTest { +public class MetricGroupRegistrationTest extends TestLogger { /** * Verifies that group methods instantiate the correct metric with the given name. */ @@ -49,7 +50,7 @@ public class MetricGroupRegistrationTest { config.setString(MetricOptions.REPORTERS_LIST, "test"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); @@ -111,7 +112,7 @@ public class MetricGroupRegistrationTest { public void testDuplicateGroupName() { Configuration config = new Configuration(); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 633dbed..94760e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; @@ -48,13 +49,13 @@ public class MetricGroupTest extends TestLogger { private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration(); - private MetricRegistry registry; + private MetricRegistryImpl registry; - private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry(); + private final MetricRegistryImpl exceptionOnRegister = new ExceptionOnRegisterRegistry(); @Before public void createRegistry() { - this.registry = new MetricRegistry(defaultMetricRegistryConfiguration); + this.registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration); } @After @@ -134,7 +135,7 @@ public class MetricGroupTest extends TestLogger { JobID jid = new JobID(); JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); - MetricRegistry registry = new MetricRegistry(defaultMetricRegistryConfiguration); + MetricRegistryImpl registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); @@ -156,7 +157,7 @@ public class MetricGroupTest extends TestLogger { // ------------------------------------------------------------------------ - private static class ExceptionOnRegisterRegistry extends MetricRegistry { + private static class ExceptionOnRegisterRegistry extends MetricRegistryImpl { public ExceptionOnRegisterRegistry() { super(defaultMetricRegistryConfiguration); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index 4363a9d..820b73e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; @@ -47,7 +47,7 @@ public class OperatorGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -70,7 +70,7 @@ public class OperatorGroupTest extends TestLogger { public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); try { String tmID = "test-tm-id"; JobID jid = new JobID(); @@ -97,7 +97,7 @@ public class OperatorGroupTest extends TestLogger { @Test public void testIOMetricGroupInstantiation() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -114,7 +114,7 @@ public class OperatorGroupTest extends TestLogger { @Test public void testVariables() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobID jid = new JobID(); JobVertexID tid = new JobVertexID(); @@ -156,7 +156,7 @@ public class OperatorGroupTest extends TestLogger { JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); OperatorID oid = new OperatorID(); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index bd85303..3272f73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; @@ -50,7 +50,7 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void addAndRemoveJobs() throws IOException { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -112,7 +112,7 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void testCloseClosesAll() throws IOException { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -152,7 +152,7 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents()); @@ -164,7 +164,7 @@ public class TaskManagerGroupTest extends TestLogger { public void testGenerateScopeCustom() { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); @@ -174,7 +174,7 @@ public class TaskManagerGroupTest extends TestLogger { @Test public void testCreateQueryServiceMetricInfo() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());
