[
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882726#comment-15882726
]
ASF GitHub Bot commented on FLINK-4364:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3151#discussion_r102762844
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atLeast;
+
+public class JobMasterTest extends TestLogger {
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Test
+ public void testHeartbeatTimeoutWithTaskManager() throws Exception {
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService =
new TestingLeaderRetrievalService();
+
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+ final TestingFatalErrorHandler testingFatalErrorHandler = new
TestingFatalErrorHandler();
+
+ final String jobManagerAddress = "jm";
+ final UUID jmLeaderId = UUID.randomUUID();
+ final ResourceID jmResourceId = new
ResourceID(jobManagerAddress);
+
+ final String taskManagerAddress = "tm";
+ final ResourceID tmResourceId = new
ResourceID(taskManagerAddress);
+ final TaskManagerLocation taskManagerLocation = new
TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+ final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
+
+ final TestingSerialRpcService rpc = new
TestingSerialRpcService();
+ rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
+
+ final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(
+ Hardware.getNumberCPUCores(),
ExecutorThreadFactory.INSTANCE);
+
+ final long heartbeatInterval = 10L;
+ final long heartbeatTimeout = 1000L;
+ final HeartbeatManagerSenderImpl<Object, Object>
jmHeartbeatManager = new HeartbeatManagerSenderImpl<>(
+ heartbeatInterval,
+ heartbeatTimeout,
+ jmResourceId,
+ executorService,
+ rpc.getScheduledExecutor(),
+ log);
+
+ final JobVertex jobVertex = new JobVertex("NoOpInvokable");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobGraph jobGraph = new JobGraph("test", jobVertex);
+
+ final BlobLibraryCacheManager libraryCacheManager = new
BlobLibraryCacheManager(mock(BlobService.class), 1000000000L);
+ libraryCacheManager.registerJob(jobGraph.getJobID(),
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
+
+ final MetricRegistry registry = new
MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ final JobManagerMetricGroup jmMetricGroup = new
JobManagerMetricGroup(registry, "host");
--- End diff --
I think you can also give `null` to the `JobMaster` constructor.
> Implement TaskManager side of heartbeat from JobManager
> -------------------------------------------------------
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhijiang
> Assignee: zhijiang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and
> the {{TaskManager}} will report metrics info for each heartbeat.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)