[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2016-11-08 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2770

[FLINK-4354]Implement TaskManager side of heartbeat from ResourceManager

When TaskManager registers at the new ResourceManager, the SlotReport will 
be attached in the message. In heartbeat process, it is no need to exchange 
SlotReport between TaskManager and ResourceManager, so the payload is null in 
heartbeat message.

When TaskManager' listener is notified of heartbeat timeout from 
ResourceManager, currently it does nothing in the event notification. And it 
will re-register the new ResourceManager by HA mechanism.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4354

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2770


commit 233e63a8b92c68fc3aeeb7feae70c8e97a3e435e
Author: 淘江 
Date:   2016-11-08T10:16:00Z

[FLINK-4354]Implement TaskManager side of heartbeat from ResourceManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99586359
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -122,4 +122,11 @@ void notifySlotAvailable(
 * @param optionalDiagnostics
 */
void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+   /**
+* sends the heartbeat to resource manager from task manager
+* @param resourceID unique id of the task manager
+* @param payload the payload information of the task manager
+*/
+   void sendHeartbeatFromTaskManager(final ResourceID resourceID, final 
Object payload);
--- End diff --

Better to call this method `reportHeartbeatFromTaskManager` or 
`heartbeatFromTaskManager`. Maybe we could also introduce a dedicated interface 
for this which is inherited by the `ResourceManagerGateway`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99587910
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -99,6 +101,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

Where is this executor shut down?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99585971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse 
registrationResponse, Thr
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID resourceID,
-   final SlotReport slotReport) {
+   final SlotReport slotReport)
+   {
--- End diff --

Formatting changes are discouraged. Especially if they change the style 
such that it diverges from the project's implicit norm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99586748
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -129,4 +130,11 @@
 * @return Future acknowledge if the task is successfully canceled
 */
Future cancelTask(ExecutionAttemptID executionAttemptID, 
@RpcTimeout Time timeout);
+
+   /**
+*  request heartbeat from the resource manager
+*
+* @param resourceID unique id of the resource manager
+*/
+   void requestHeartbeatFromResourceManager(ResourceID resourceID);
--- End diff --

Same applies here for the naming as well as the outsourcing of this method 
in a dedicated heartbeat interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99588554
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1099,4 +1121,23 @@ public void run() {
});
}
}
+
+   private final class ResourceManagerHeartbeatListener implements 
HeartbeatListener {
+
+   ResourceManagerHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(ResourceID resourceID) {
+   }
--- End diff --

What happens in case of a timeout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99585837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -137,6 +140,7 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
+   this.resourceID = ResourceID.generate();
--- End diff --

I think the `resourceId` should be determined by the 
`ResourceManagerRunner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99736682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse 
registrationResponse, Thr
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID resourceID,
-   final SlotReport slotReport) {
+   final SlotReport slotReport)
+   {
--- End diff --

It may be a misoperation, I will be careful next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -137,6 +140,7 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
+   this.resourceID = ResourceID.generate();
--- End diff --

Yes, in the current implementation it is determined by 
**ResourceManagerRunner**.This PR is submitted long time ago and some dependent 
work is not complete at that time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -122,4 +122,11 @@ void notifySlotAvailable(
 * @param optionalDiagnostics
 */
void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+   /**
+* sends the heartbeat to resource manager from task manager
+* @param resourceID unique id of the task manager
+* @param payload the payload information of the task manager
+*/
+   void sendHeartbeatFromTaskManager(final ResourceID resourceID, final 
Object payload);
--- End diff --

agree with it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1099,4 +1121,23 @@ public void run() {
});
}
}
+
+   private final class ResourceManagerHeartbeatListener implements 
HeartbeatListener {
+
+   ResourceManagerHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(ResourceID resourceID) {
+   }
--- End diff --

It will be added in the new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737255
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -99,6 +101,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

It is already added in new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2770


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---