[
https://issues.apache.org/jira/browse/FLINK-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406144#comment-16406144
]
ASF GitHub Bot commented on FLINK-8942:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5699#discussion_r175726001
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
---
@@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() {
}
}
+ /**
+ * Tests that the heartbeat target {@link ResourceID} is properly
passed to the {@link HeartbeatListener} by the
+ * {@link HeartbeatManagerImpl}.
+ */
+ @Test
+ public void testHeartbeatManagerTargetPayload() {
+ final long heartbeatTimeout = 100L;
+
+ final ResourceID someTargetId = ResourceID.generate();
+ final ResourceID specialTargetId = ResourceID.generate();
+ final TargetDependentHeartbeatReceiver someHeartbeatTarget =
new TargetDependentHeartbeatReceiver();
+ final TargetDependentHeartbeatReceiver specialHeartbeatTarget =
new TargetDependentHeartbeatReceiver();
+
+ final int defaultResponse = 0;
+ final int specialResponse = 1;
+
+ HeartbeatManager<?, Integer> heartbeatManager = new
HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ ResourceID.generate(),
+ new TargetDependentHeartbeatSender(specialTargetId,
specialResponse, defaultResponse),
+ Executors.directExecutor(),
+ mock(ScheduledExecutor.class),
+ LOG);
+
+ try {
+ heartbeatManager.monitorTarget(someTargetId,
someHeartbeatTarget);
+ heartbeatManager.monitorTarget(specialTargetId,
specialHeartbeatTarget);
+
+ heartbeatManager.requestHeartbeat(someTargetId, null);
+ assertEquals(defaultResponse,
someHeartbeatTarget.getLastReceivedHeartbeatPayload());
+
+ heartbeatManager.requestHeartbeat(specialTargetId,
null);
+ assertEquals(specialResponse,
specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
+ } finally {
+ heartbeatManager.stop();
+ }
+ }
+
+ /**
+ * Tests that the heartbeat target {@link ResourceID} is properly
passed to the {@link HeartbeatListener} by the
+ * {@link HeartbeatManagerSenderImpl}.
+ */
+ @Test
+ public void testHeartbeatManagerSenderTargetPayload() throws Exception {
+ final long heartbeatTimeout = 100L;
+ final long heartbeatPeriod = 2000L;
--- End diff --
Let's set the period to `1L` to speed up the test a bit.
> Pass target ResourceID to HeartbeatListener#retrievePayload
> -----------------------------------------------------------
>
> Key: FLINK-8942
> URL: https://issues.apache.org/jira/browse/FLINK-8942
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Blocker
> Fix For: 1.5.0
>
>
> For FLINK-8881 we need a way to determine to which JobManager we are sending
> the heartbeats, otherwise we would have to send all accumulators, that is for
> all jobs, to each connected JobManager.
> I suggest to pass the targetĀ {{ResourceID}} to
> {{HeartbeatListener#retrievePayload}} which generates the payload.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)