[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5699 ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5699#discussion_r175732315 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java --- @@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() { } } + /** +* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the +* {@link HeartbeatManagerImpl}. +*/ + @Test + public void testHeartbeatManagerTargetPayload() { + final long heartbeatTimeout = 100L; + + final ResourceID someTargetId = ResourceID.generate(); + final ResourceID specialTargetId = ResourceID.generate(); + final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + + final int defaultResponse = 0; + final int specialResponse = 1; + + HeartbeatManager heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ResourceID.generate(), + new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse), + Executors.directExecutor(), + mock(ScheduledExecutor.class), + LOG); + + try { + heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget); + heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget); + + heartbeatManager.requestHeartbeat(someTargetId, null); + assertEquals(defaultResponse, someHeartbeatTarget.getLastReceivedHeartbeatPayload()); + + heartbeatManager.requestHeartbeat(specialTargetId, null); + assertEquals(specialResponse, specialHeartbeatTarget.getLastReceivedHeartbeatPayload()); + } finally { + heartbeatManager.stop(); + } + } + + /** +* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the +* {@link HeartbeatManagerSenderImpl}. +*/ + @Test + public void testHeartbeatManagerSenderTargetPayload() throws Exception { + final long heartbeatTimeout = 100L; + final long heartbeatPeriod = 2000L; --- End diff -- That shouldn't have an affect on the test time. The `HeartbeatManagerSenderImpl` sends one heartbeat right away without delay, and we only require on for each target. ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5699#discussion_r175730535 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java --- @@ -106,8 +107,8 @@ Executor getExecutor() { return heartbeatListener; } - CollectiongetHeartbeatTargets() { - return heartbeatTargets.values(); + Collection > getHeartbeatTargets() { --- End diff -- yes that is a better solution. ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5699#discussion_r175726725 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java --- @@ -106,8 +107,8 @@ Executor getExecutor() { return heartbeatListener; } - CollectiongetHeartbeatTargets() { - return heartbeatTargets.values(); + Collection > getHeartbeatTargets() { --- End diff -- What about adding a `getTargetID()` to the `HeartbeatMonitor`? Then we would not have to expose, even though it is an internal method, that we use a `Map` to store the heartbeat targets. We could then keep the return type `Collection`. ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5699#discussion_r175726001 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java --- @@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() { } } + /** +* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the +* {@link HeartbeatManagerImpl}. +*/ + @Test + public void testHeartbeatManagerTargetPayload() { + final long heartbeatTimeout = 100L; + + final ResourceID someTargetId = ResourceID.generate(); + final ResourceID specialTargetId = ResourceID.generate(); + final TargetDependentHeartbeatReceiver someHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + final TargetDependentHeartbeatReceiver specialHeartbeatTarget = new TargetDependentHeartbeatReceiver(); + + final int defaultResponse = 0; + final int specialResponse = 1; + + HeartbeatManager heartbeatManager = new HeartbeatManagerImpl<>( + heartbeatTimeout, + ResourceID.generate(), + new TargetDependentHeartbeatSender(specialTargetId, specialResponse, defaultResponse), + Executors.directExecutor(), + mock(ScheduledExecutor.class), + LOG); + + try { + heartbeatManager.monitorTarget(someTargetId, someHeartbeatTarget); + heartbeatManager.monitorTarget(specialTargetId, specialHeartbeatTarget); + + heartbeatManager.requestHeartbeat(someTargetId, null); + assertEquals(defaultResponse, someHeartbeatTarget.getLastReceivedHeartbeatPayload()); + + heartbeatManager.requestHeartbeat(specialTargetId, null); + assertEquals(specialResponse, specialHeartbeatTarget.getLastReceivedHeartbeatPayload()); + } finally { + heartbeatManager.stop(); + } + } + + /** +* Tests that the heartbeat target {@link ResourceID} is properly passed to the {@link HeartbeatListener} by the +* {@link HeartbeatManagerSenderImpl}. +*/ + @Test + public void testHeartbeatManagerSenderTargetPayload() throws Exception { + final long heartbeatTimeout = 100L; + final long heartbeatPeriod = 2000L; --- End diff -- Let's set the period to `1L` to speed up the test a bit. ---
[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5699 [FLINK-8942][runtime] Pass heartbeat target ResourceID ## What is the purpose of the change With this PR the heartbeat target `ResourceID` is passed to the `HeartbeatListener` when retrieving the payload so send. This allows the listener to create target-dependent payloads. The primary use-case is FLINK-8881, where accumulators are sent via heartbeats to the JobManager. Here we only want to send accumulators for the relevant job, and not for all jobs. ## Brief change log * add a `ResourceID` parameter to `HeartbeatListener#retrievePayload` * modify return type of `HeartbeatManagerImpl#getHeartbeatTargets` to also contain the target `ResourceID` ## Verifying this change This change added tests: `HeartbeatManagerTest`: * `testHeartbeatManagerTargetPayload` * `testHeartbeatManagerSenderTargetPayload` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5699.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5699 commit d264dbbd260abc995ced8095a220260e8e0b3f1a Author: zentolDate: 2018-03-14T13:21:27Z [FLINK-8942][runtime] Pass heartbeat target ResourceID ---