[ https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356868#comment-15356868 ]
ASF GitHub Bot commented on FLINK-1550: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2146#discussion_r69107628 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java --- @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.HeapStateStore; +import org.apache.flink.runtime.checkpoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.instance.InstanceManager; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingMessages; +import org.apache.flink.runtime.testingUtils.TestingTaskManager; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Int; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB; +import static org.junit.Assert.assertEquals; + +public class JobManagerMetricTest { + + private static ActorSystem system; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** + * Tests that metrics registered on the JobManager are actually accessible. + * + * @throws Exception + */ + @Test + public void testJobManagerMetricAccess() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archive = null; + ActorRef jobManager = null; + ActorRef taskManager = null; + + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>"); + + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + InstanceManager instanceManager = new InstanceManager(); + instanceManager.addInstanceListener(scheduler); + + archive = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + Props jobManagerProps = Props.create( + TestingJobManager.class, + flinkConfiguration, + new ForkJoinPool(), + instanceManager, + scheduler, + new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + archive, + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), + timeout, + myLeaderElectionService, + new StandaloneSubmittedJobGraphStore(), + new StandaloneCheckpointRecoveryFactory(), + new SavepointStore(new HeapStateStore<CompletedCheckpoint>()), + jobRecoveryTimeout, + Option.apply(new MetricRegistry(flinkConfiguration))); + + jobManager = system.actorOf(jobManagerProps, "jobmanager"); + ActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID); + + taskManager = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID); + Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + 500, 500, 50, 5)); + + Future<Object> isLeader = gateway.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + Future<Object> isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), + deadline.timeLeft()); + + // tell jobManager that he's the leader + myLeaderElectionService.isLeader(leaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); --- End diff -- I've refactored the test case. > Show JVM Metrics for JobManager > ------------------------------- > > Key: FLINK-1550 > URL: https://issues.apache.org/jira/browse/FLINK-1550 > Project: Flink > Issue Type: Sub-task > Components: JobManager, Metrics > Reporter: Robert Metzger > Assignee: Chesnay Schepler > Fix For: pre-apache > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)