Repository: flink Updated Branches: refs/heads/master a7e0a277f -> 7fb7e0b97
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index fc0ce5c..b6be31c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; @@ -39,7 +39,7 @@ public class TaskManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -59,7 +59,7 @@ public class TaskManagerJobGroupTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.<job_name>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -81,7 +81,7 @@ public class TaskManagerJobGroupTest extends TestLogger { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); @@ -101,7 +101,7 @@ public class TaskManagerJobGroupTest extends TestLogger { @Test public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index 43cbbf1..be7407e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; @@ -49,7 +49,7 @@ public class TaskMetricGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobVertexID vertexId = new JobVertexID(); AbstractID executionId = new AbstractID(); @@ -73,7 +73,7 @@ public class TaskMetricGroupTest extends TestLogger { cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def"); cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); JobID jid = new JobID(); JobVertexID vertexId = new JobVertexID(); @@ -98,7 +98,7 @@ public class TaskMetricGroupTest extends TestLogger { public void testGenerateScopeWilcard() { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>"); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); AbstractID executionId = new AbstractID(); @@ -123,7 +123,7 @@ public class TaskMetricGroupTest extends TestLogger { JobID jid = new JobID(); JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); @@ -157,7 +157,7 @@ public class TaskMetricGroupTest extends TestLogger { public void testOperatorNameTruncation() { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME); - MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname"); TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0); @@ -170,7 +170,7 @@ public class TaskMetricGroupTest extends TestLogger { Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName); } - private static class CountingMetricRegistry extends MetricRegistry { + private static class CountingMetricRegistry extends MetricRegistryImpl { private int counter = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index 2ca19c1..3d29815 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -37,7 +37,7 @@ import java.util.UUID; public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); public UnregisteredTaskMetricsGroup() { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index d0dd973..306d4d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -78,7 +78,7 @@ public class ResourceManagerHATest extends TestLogger { highAvailabilityServices, rpcService.getScheduledExecutor()); - MetricRegistry metricRegistry = mock(MetricRegistry.class); + MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 73c5b5c..1b6324c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -270,7 +270,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), Time.seconds(5L)); - MetricRegistry metricRegistry = mock(MetricRegistry.class); + MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 0206ade..147d180 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -182,7 +182,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); - MetricRegistry metricRegistry = mock(MetricRegistry.class); + MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java index 8e76674..b600cbe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java @@ -105,7 +105,7 @@ public class TaskManagerLogHandlerTest { JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337)); when(jobManagerGateway.getHostname()).thenReturn("localhost"); - when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), any(Time.class))).thenReturn( + when(jobManagerGateway.requestTaskManagerInstance(any(ResourceID.class), any(Time.class))).thenReturn( CompletableFuture.completedFuture(Optional.of(taskManager))); GatewayRetriever<JobManagerGateway> retriever = mock(GatewayRetriever.class); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index ce98f31..7b10db6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -89,7 +89,7 @@ public class MetricFetcherTest extends TestLogger { when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn( CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath))); when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn( - CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmID, tmMetricQueryServicePath)))); + CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath)))); GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class); when(retriever.getNow()) @@ -99,7 +99,7 @@ public class MetricFetcherTest extends TestLogger { MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class); MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class); - MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID); + MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID); when(jmQueryService.queryMetrics(any(Time.class))) .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], 0, 0, 0, 0))); @@ -133,14 +133,14 @@ public class MetricFetcherTest extends TestLogger { assertEquals("0.99", store.getJobManagerMetricStore().getMetric("abc.hist_p99")); assertEquals("0.999", store.getJobManagerMetricStore().getMetric("abc.hist_p999")); - assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge")); + assertEquals("x", store.getTaskManagerMetricStore(tmRID.toString()).metrics.get("abc.gauge")); assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc")); assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc")); assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc")); } } - private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(InstanceID tmID, JobID jobID) { + private static MetricDumpSerialization.MetricSerializationResult createRequestDumpAnswer(ResourceID tmRID, JobID jobID) { Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); @@ -178,7 +178,7 @@ public class MetricFetcherTest extends TestLogger { public String getValue() { return "x"; } - }, new Tuple2<>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); + }, new Tuple2<>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmRID.toString(), "abc"), "gauge")); histograms.put(new TestingHistogram(), new Tuple2<>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index e448ccc..7c6b7dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; @@ -112,7 +112,7 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); - MetricRegistry metricRegistry = mock(MetricRegistry.class); + MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class); HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS); final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e106238..fcd6e4e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -61,7 +61,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -205,7 +205,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -311,7 +311,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -429,7 +429,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -522,7 +522,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -605,7 +605,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -747,7 +747,7 @@ public class TaskExecutorTest extends TestLogger { networkEnvironment, haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -863,7 +863,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -981,7 +981,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1074,7 +1074,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1248,7 +1248,7 @@ public class TaskExecutorTest extends TestLogger { networkMock, haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1371,7 +1371,7 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServicesMock, heartbeatServicesMock, - mock(MetricRegistry.class), + mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java index bf90634..a8358a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java @@ -21,9 +21,9 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; + import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -259,7 +259,6 @@ public class TaskManagerServicesTest { managedMemory, false, managedMemoryFraction, - mock(MetricRegistryConfiguration.class), 0); } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 10b74c3..8249fca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -49,8 +49,9 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -62,7 +63,6 @@ import scala.Option; import scala.concurrent.duration.FiniteDuration; import java.net.InetAddress; -import java.util.Arrays; import java.util.concurrent.TimeUnit; public class TaskManagerComponentsStartupShutdownTest extends TestLogger { @@ -99,6 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1(); @@ -168,7 +169,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network, numberOfSlots, highAvailabilityServices, - new MetricRegistry(metricRegistryConfiguration)); + new MetricRegistryImpl(metricRegistryConfiguration)); taskManager = actorSystem.actorOf(tmProps); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index 22d49d2..7429ec5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -18,12 +18,9 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; @@ -32,17 +29,16 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; -import org.junit.Test; -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.junit.Test; import java.io.File; import java.io.IOException; @@ -50,6 +46,11 @@ import java.io.InputStream; import java.io.StringWriter; import java.util.concurrent.TimeUnit; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.duration.FiniteDuration; + import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; @@ -121,6 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1; http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 67accdb..2e6c580 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.StartupUtils; import org.apache.flink.util.NetUtils; @@ -249,6 +250,7 @@ public class TaskManagerStartupTest extends TestLogger { ResourceID.generate(), null, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index f575867..887c4f5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance._ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} +import org.apache.flink.runtime.metrics.{MetricRegistryImpl, MetricRegistryConfiguration} import org.apache.flink.runtime.taskmanager.TaskManagerLocation import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils} @@ -60,6 +61,10 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor var highAvailabilityServices: HighAvailabilityServices = _ + val metricRegistry: MetricRegistryImpl = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(new Configuration()) + ) + val timeout = FiniteDuration(30, TimeUnit.SECONDS) override def afterAll(): Unit = { @@ -87,7 +92,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor var tm2Option: Option[ActorRef] = None try { - val jm = startTestingJobManager(_system, highAvailabilityServices) + val jm = startTestingJobManager(_system, highAvailabilityServices, metricRegistry) jmOption = Some(jm) val rm = startTestingResourceManager(_system, highAvailabilityServices) @@ -169,7 +174,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor try { val probe = TestProbe() - val jm = startTestingJobManager(_system, highAvailabilityServices) + val jm = startTestingJobManager(_system, highAvailabilityServices, metricRegistry) jmOption = Some(jm) val rm = startTestingResourceManager(_system, highAvailabilityServices) rmOption = Some(rm) @@ -242,7 +247,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor private def startTestingJobManager( system: ActorSystem, - highAvailabilityServices: HighAvailabilityServices): ActorGateway = { + highAvailabilityServices: HighAvailabilityServices, + metricRegistry: MetricRegistryImpl): ActorGateway = { val config = new Configuration() @@ -250,7 +256,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor config, executor, executor, - highAvailabilityServices.createBlobStore()) + highAvailabilityServices.createBlobStore(), + metricRegistry) // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService. // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus, @@ -273,7 +280,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor highAvailabilityServices.getSubmittedJobGraphStore(), highAvailabilityServices.getCheckpointRecoveryFactory(), components._9, - None, + components._10, None) _system.actorOf(props) http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 2e884a0..2b91cd4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -42,7 +42,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.MetricRegistryImpl +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ @@ -120,7 +121,7 @@ class TestingCluster( submittedJobGraphStore: SubmittedJobGraphStore, checkpointRecoveryFactory: CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[MetricRegistry], + jobManagerMetricGroup: JobManagerMetricGroup, optRestAddress: Option[String]): Props = { val props = super.getJobManagerProps( @@ -139,7 +140,7 @@ class TestingCluster( submittedJobGraphStore, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) if (synchronousDispatcher) { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 2170a8c..13e2b75 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import scala.concurrent.duration._ import scala.language.postfixOps @@ -53,7 +53,7 @@ class TestingJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, jobRecoveryTimeout : FiniteDuration, - metricRegistry : Option[MetricRegistry], + jobManagerMetricGroup : JobManagerMetricGroup, optRestAddress: Option[String]) extends JobManager( flinkConfiguration, @@ -70,6 +70,6 @@ class TestingJobManager( submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, - metricRegistry, + jobManagerMetricGroup, optRestAddress) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 1db0a85..da753ae 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -23,7 +23,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -40,7 +40,7 @@ class TestingTaskManager( network: NetworkEnvironment, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, - metricRegistry : MetricRegistry) + taskManagerMetricGroup : TaskManagerMetricGroup) extends TaskManager( config, resourceID, @@ -50,7 +50,7 @@ class TestingTaskManager( network, numberOfSlots, highAvailabilityServices, - metricRegistry) + taskManagerMetricGroup) with TestingTaskManagerLike { def this( @@ -61,7 +61,7 @@ class TestingTaskManager( network: NetworkEnvironment, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, - metricRegistry : MetricRegistry) { + taskManagerMetricGroup : TaskManagerMetricGroup) { this( config, ResourceID.generate(), @@ -71,6 +71,6 @@ class TestingTaskManager( network, numberOfSlots, highAvailabilityServices, - metricRegistry) + taskManagerMetricGroup) } } http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index f78af9f..2de6f9e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -18,9 +18,10 @@ package org.apache.flink.runtime.testingUtils +import java.net.InetAddress import java.util -import java.util.{Collections, UUID} import java.util.concurrent._ +import java.util.{Collections, UUID} import akka.actor.{ActorRef, ActorSystem, Kill, Props} import akka.pattern.{Patterns, ask} @@ -38,13 +39,15 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} import org.apache.flink.runtime.jobmaster.JobMaster import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} +import org.apache.flink.runtime.metrics.{MetricRegistryImpl, MetricRegistryConfiguration} +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} -import scala.concurrent.duration.TimeUnit -import scala.concurrent.duration._ +import scala.concurrent.duration.{TimeUnit, _} import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} import scala.language.postfixOps @@ -266,11 +269,17 @@ object TestingUtils { resultingConfiguration.addAll(configuration) + val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(configuration)) + + val taskManagerResourceId = ResourceID.generate() + val taskManager = TaskManager.startTaskManagerComponentsAndActor( resultingConfiguration, - ResourceID.generate(), + taskManagerResourceId, actorSystem, highAvailabilityServices, + metricRegistry, "localhost", None, useLocalCommunication, @@ -471,12 +480,16 @@ object TestingUtils { HighAvailabilityOptions.HA_MODE, ConfigConstants.DEFAULT_HA_MODE) + val metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(configuration)) + val (actor, _) = JobManager.startJobManagerActors( configuration, actorSystem, futureExecutor, ioExecutor, highAvailabilityServices, + metricRegistry, None, Some(prefix + JobMaster.JOB_MANAGER_NAME), Some(prefix + JobMaster.ARCHIVE_NAME), http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index a78c528..94aed2a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -145,6 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 51868af..7c53d52 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; @@ -217,6 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index b575dca..ee37d6d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -203,6 +204,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { ResourceID.generate(), taskManagerSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 2820dd2..8e97e9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -281,6 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { ResourceID.generate(), tmActorSystem[i], highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.<String>empty(), false, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index c70c2d5..ecd0bea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.NetUtils; @@ -120,6 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index e5131af..7488b62 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingJobManager; @@ -112,6 +113,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), highAvailabilityServices, + new NoOpMetricRegistry(), Option.empty(), Option.apply("jm"), Option.apply("arch"), @@ -133,6 +135,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { ResourceID.generate(), actorSystem, highAvailabilityServices, + new NoOpMetricRegistry(), "localhost", Option.apply("tm"), true, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 298cd52..d0084b6 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -30,7 +30,7 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.testingUtils.TestingJobManagerLike import scala.concurrent.duration.FiniteDuration @@ -68,7 +68,7 @@ class TestingYarnJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricRegistry : Option[MetricRegistry], + jobManagerMetricGroup : JobManagerMetricGroup, optRestAddress: Option[String]) extends YarnJobManager( flinkConfiguration, @@ -85,6 +85,6 @@ class TestingYarnJobManager( submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, - metricRegistry, + jobManagerMetricGroup, optRestAddress) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index a03f365..228eaaa 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -23,7 +23,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.TaskManagerLocation import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike @@ -52,7 +52,7 @@ class TestingYarnTaskManager( network: NetworkEnvironment, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, - metricRegistry : MetricRegistry) + taskManagerMetricGroup : TaskManagerMetricGroup) extends YarnTaskManager( config, resourceID, @@ -62,7 +62,7 @@ class TestingYarnTaskManager( network, numberOfSlots, highAvailabilityServices, - metricRegistry) + taskManagerMetricGroup) with TestingTaskManagerLike { object YarnTaskManager { http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index c101b75..95ba154 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -36,6 +36,8 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -219,6 +221,7 @@ public class YarnApplicationMasterRunner { ActorSystem actorSystem = null; WebMonitor webMonitor = null; HighAvailabilityServices highAvailabilityServices = null; + MetricRegistryImpl metricRegistry = null; int numberProcessors = Hardware.getNumberCPUCores(); @@ -357,6 +360,11 @@ public class YarnApplicationMasterRunner { new ScheduledExecutorServiceAdapter(futureExecutor), LOG); + metricRegistry = new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration(config)); + + metricRegistry.startQueryService(actorSystem, null); + // 2: the JobManager LOG.debug("Starting JobManager actor"); @@ -367,6 +375,7 @@ public class YarnApplicationMasterRunner { futureExecutor, ioExecutor, highAvailabilityServices, + metricRegistry, webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()), new Some<>(JobMaster.JOB_MANAGER_NAME), Option.<String>empty(), @@ -455,6 +464,10 @@ public class YarnApplicationMasterRunner { } } + if (metricRegistry != null) { + metricRegistry.shutdown(); + } + org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( AkkaUtils.getTimeout(config).toMillis(), TimeUnit.MILLISECONDS, http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index b32d25c..6feb287 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; @@ -116,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java index e1efb54..439fdf3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java index 042644b..a13f62c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 5909160..6b439bd 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -34,7 +34,8 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.MetricRegistryImpl +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.yarn.configuration.YarnConfigOptions import scala.concurrent.duration._ @@ -72,7 +73,7 @@ class YarnJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, jobRecoveryTimeout: FiniteDuration, - metricsRegistry: Option[MetricRegistry], + jobManagerMetricGroup: JobManagerMetricGroup, optRestAddress: Option[String]) extends ContaineredJobManager( flinkConfiguration, @@ -89,7 +90,7 @@ class YarnJobManager( submittedJobGraphs, checkpointRecoveryFactory, jobRecoveryTimeout, - metricsRegistry, + jobManagerMetricGroup, optRestAddress) { val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index e37ff6f..615466d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -23,7 +23,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} @@ -39,7 +39,7 @@ class YarnTaskManager( network: NetworkEnvironment, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, - metricRegistry : MetricRegistry) + taskManagerMetricGroup: TaskManagerMetricGroup) extends TaskManager( config, resourceID, @@ -49,7 +49,7 @@ class YarnTaskManager( network, numberOfSlots, highAvailabilityServices, - metricRegistry) { + taskManagerMetricGroup) { override def handleMessage: Receive = { super.handleMessage
