http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index e4ceb40..72c03af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -18,15 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import akka.util.Timeout; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -42,25 +33,37 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.testingUtils.TestingJobManager; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.TestingServer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + +import java.util.concurrent.TimeUnit; + import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; - public class JobManagerLeaderElectionTest extends TestLogger { @Rule @@ -198,7 +201,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { submittedJobGraphStore, checkpointRecoveryFactory, AkkaUtils.getDefaultTimeoutAsFiniteDuration(), - Option.<MetricRegistryImpl>empty(), + new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"), Option.<String>empty()); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java index 1140e3d..46d6548 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java @@ -23,6 +23,8 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import javax.annotation.Nullable; + /** * Metric registry which does nothing and is intended for testing purposes. */ @@ -57,4 +59,10 @@ public class NoOpMetricRegistry implements MetricRegistry { public ScopeFormats getScopeFormats() { return scopeFormats; } + + @Nullable + @Override + public String getMetricQueryServicePath() { + return null; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 31304e5..d934ea9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; @@ -94,8 +96,12 @@ public class TaskManagerMetricsTest extends TestLogger { TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, - tmResourceID, - metricRegistry); + tmResourceID); + + TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( + metricRegistry, + taskManagerServices.getTaskManagerLocation(), + taskManagerServices.getNetworkEnvironment()); // create the task manager final Props tmProps = TaskManager.getTaskManagerProps( @@ -107,7 +113,7 @@ public class TaskManagerMetricsTest extends TestLogger { taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), highAvailabilityServices, - taskManagerServices.getTaskManagerMetricGroup()); + taskManagerMetricGroup); final ActorRef taskManager = actorSystem.actorOf(tmProps); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index 3d29815..7065e6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -22,22 +22,22 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.UUID; public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { - private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry(); public UnregisteredTaskMetricsGroup() { http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index 7b10db6..faa97a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -28,7 +28,6 @@ import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -72,8 +71,7 @@ public class MetricFetcherTest extends TestLogger { // ========= setup TaskManager ================================================================================= JobID jobID = new JobID(); - InstanceID tmID = new InstanceID(); - ResourceID tmRID = new ResourceID(tmID.toString()); + ResourceID tmRID = ResourceID.generate(); // ========= setup JobManager ================================================================================== JobDetails details = mock(JobDetails.class); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 7c6b7dd..1f1d09d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -153,7 +153,6 @@ public class TaskExecutorITCase extends TestLogger { networkEnvironment, testingHAServices, heartbeatServices, - metricRegistry, taskManagerMetricGroup, broadcastVariableManager, fileCache, http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index fcd6e4e..de807a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -45,8 +45,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; @@ -61,7 +61,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -114,7 +113,16 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TaskExecutorTest extends TestLogger { @@ -205,7 +213,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -311,7 +318,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -429,7 +435,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, heartbeatServices, - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -522,7 +527,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -605,7 +609,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -747,7 +750,6 @@ public class TaskExecutorTest extends TestLogger { networkEnvironment, haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -863,7 +865,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -981,7 +982,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1074,7 +1074,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1248,7 +1247,6 @@ public class TaskExecutorTest extends TestLogger { networkMock, haServices, mock(HeartbeatServices.class, RETURNS_MOCKS), - mock(MetricRegistryImpl.class), taskManagerMetricGroup, mock(BroadcastVariableManager.class), mock(FileCache.class), @@ -1371,7 +1369,6 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServicesMock, heartbeatServicesMock, - mock(MetricRegistryImpl.class), mock(TaskManagerMetricGroup.class), mock(BroadcastVariableManager.class), mock(FileCache.class), http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 8249fca..4b62770 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import static org.junit.Assert.*; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.actor.Props; -import akka.testkit.JavaTestKit; - import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; @@ -49,21 +41,28 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; - import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; import org.junit.Test; +import java.net.InetAddress; +import java.util.concurrent.TimeUnit; + import scala.Option; import scala.concurrent.duration.FiniteDuration; -import java.net.InetAddress; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; public class TaskManagerComponentsStartupShutdownTest extends TestLogger { @@ -169,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network, numberOfSlots, highAvailabilityServices, - new MetricRegistryImpl(metricRegistryConfiguration)); + new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString())); taskManager = actorSystem.actorOf(tmProps); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 2b91cd4..0369771 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.metrics.MetricRegistryImpl import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 75f0aa4..16e238b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -80,6 +80,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 95ba154..3bdc2ac 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -465,7 +465,11 @@ public class YarnApplicationMasterRunner { } if (metricRegistry != null) { - metricRegistry.shutdown(); + try { + metricRegistry.shutdown(); + } catch (Throwable t) { + LOG.error("Could not properly shut down the metric registry.", t); + } } org.apache.flink.runtime.concurrent.Executors.gracefulShutdown( http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6feb287..b32d25c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; @@ -116,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { super( http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java index 439fdf3..e1efb54 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java index a13f62c..042644b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; @@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint { RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistryImpl metricRegistry, + MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler) throws Exception { final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
