[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5699


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5699#discussion_r175732315
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 ---
@@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() {
}
}
 
+   /**
+* Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+* {@link HeartbeatManagerImpl}.
+*/
+   @Test
+   public void testHeartbeatManagerTargetPayload() {
+   final long heartbeatTimeout = 100L;
+
+   final ResourceID someTargetId = ResourceID.generate();
+   final ResourceID specialTargetId = ResourceID.generate();
+   final TargetDependentHeartbeatReceiver someHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+   final TargetDependentHeartbeatReceiver specialHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+
+   final int defaultResponse = 0;
+   final int specialResponse = 1;
+
+   HeartbeatManager heartbeatManager = new 
HeartbeatManagerImpl<>(
+   heartbeatTimeout,
+   ResourceID.generate(),
+   new TargetDependentHeartbeatSender(specialTargetId, 
specialResponse, defaultResponse),
+   Executors.directExecutor(),
+   mock(ScheduledExecutor.class),
+   LOG);
+
+   try {
+   heartbeatManager.monitorTarget(someTargetId, 
someHeartbeatTarget);
+   heartbeatManager.monitorTarget(specialTargetId, 
specialHeartbeatTarget);
+
+   heartbeatManager.requestHeartbeat(someTargetId, null);
+   assertEquals(defaultResponse, 
someHeartbeatTarget.getLastReceivedHeartbeatPayload());
+
+   heartbeatManager.requestHeartbeat(specialTargetId, 
null);
+   assertEquals(specialResponse, 
specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
+   } finally {
+   heartbeatManager.stop();
+   }
+   }
+
+   /**
+* Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+* {@link HeartbeatManagerSenderImpl}.
+*/
+   @Test
+   public void testHeartbeatManagerSenderTargetPayload() throws Exception {
+   final long heartbeatTimeout = 100L;
+   final long heartbeatPeriod = 2000L;
--- End diff --

That shouldn't have an affect on the test time. The 
`HeartbeatManagerSenderImpl` sends one heartbeat right away without delay, and 
we only require on for each target.


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5699#discussion_r175730535
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 ---
@@ -106,8 +107,8 @@ Executor getExecutor() {
return heartbeatListener;
}
 
-   Collection 
getHeartbeatTargets() {
-   return heartbeatTargets.values();
+   Collection> 
getHeartbeatTargets() {
--- End diff --

yes that is a better solution.


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5699#discussion_r175726725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 ---
@@ -106,8 +107,8 @@ Executor getExecutor() {
return heartbeatListener;
}
 
-   Collection 
getHeartbeatTargets() {
-   return heartbeatTargets.values();
+   Collection> 
getHeartbeatTargets() {
--- End diff --

What about adding a `getTargetID()` to the `HeartbeatMonitor`? Then we 
would not have to expose, even though it is an internal method, that we use a 
`Map` to store the heartbeat targets. We could then keep the return type 
`Collection`.


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5699#discussion_r175726001
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.java
 ---
@@ -347,6 +348,162 @@ public void testLastHeartbeatFrom() {
}
}
 
+   /**
+* Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+* {@link HeartbeatManagerImpl}.
+*/
+   @Test
+   public void testHeartbeatManagerTargetPayload() {
+   final long heartbeatTimeout = 100L;
+
+   final ResourceID someTargetId = ResourceID.generate();
+   final ResourceID specialTargetId = ResourceID.generate();
+   final TargetDependentHeartbeatReceiver someHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+   final TargetDependentHeartbeatReceiver specialHeartbeatTarget = 
new TargetDependentHeartbeatReceiver();
+
+   final int defaultResponse = 0;
+   final int specialResponse = 1;
+
+   HeartbeatManager heartbeatManager = new 
HeartbeatManagerImpl<>(
+   heartbeatTimeout,
+   ResourceID.generate(),
+   new TargetDependentHeartbeatSender(specialTargetId, 
specialResponse, defaultResponse),
+   Executors.directExecutor(),
+   mock(ScheduledExecutor.class),
+   LOG);
+
+   try {
+   heartbeatManager.monitorTarget(someTargetId, 
someHeartbeatTarget);
+   heartbeatManager.monitorTarget(specialTargetId, 
specialHeartbeatTarget);
+
+   heartbeatManager.requestHeartbeat(someTargetId, null);
+   assertEquals(defaultResponse, 
someHeartbeatTarget.getLastReceivedHeartbeatPayload());
+
+   heartbeatManager.requestHeartbeat(specialTargetId, 
null);
+   assertEquals(specialResponse, 
specialHeartbeatTarget.getLastReceivedHeartbeatPayload());
+   } finally {
+   heartbeatManager.stop();
+   }
+   }
+
+   /**
+* Tests that the heartbeat target {@link ResourceID} is properly 
passed to the {@link HeartbeatListener} by the
+* {@link HeartbeatManagerSenderImpl}.
+*/
+   @Test
+   public void testHeartbeatManagerSenderTargetPayload() throws Exception {
+   final long heartbeatTimeout = 100L;
+   final long heartbeatPeriod = 2000L;
--- End diff --

Let's set the period to `1L` to speed up the test a bit.


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5699

[FLINK-8942][runtime] Pass heartbeat target ResourceID

## What is the purpose of the change

With this PR the heartbeat target `ResourceID` is passed to the 
`HeartbeatListener` when retrieving the payload so send. This allows the 
listener to create target-dependent payloads.

The primary use-case is FLINK-8881, where accumulators are sent via 
heartbeats to the JobManager. Here we only want to send accumulators for the 
relevant job, and not for all jobs.

## Brief change log

* add a `ResourceID` parameter to `HeartbeatListener#retrievePayload`
* modify return type of `HeartbeatManagerImpl#getHeartbeatTargets` to also 
contain the target `ResourceID`

## Verifying this change

This change added tests:

`HeartbeatManagerTest`:
* `testHeartbeatManagerTargetPayload`
* `testHeartbeatManagerSenderTargetPayload`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8942

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5699.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5699


commit d264dbbd260abc995ced8095a220260e8e0b3f1a
Author: zentol 
Date:   2018-03-14T13:21:27Z

[FLINK-8942][runtime] Pass heartbeat target ResourceID




---