[GitHub] flink pull request #2141: [flink-4021] Problem of setting autoread for netty...

2016-06-21 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2141

[flink-4021] Problem of setting autoread for netty channel when more ta…

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

…sks sharing the same Tcp connection

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2141


commit 7941150cc153d694b543dc92a36cc089eafcdbac
Author: 淘江 
Date:   2016-06-21T09:13:37Z

flink-4021:Problem of setting autoread for netty channel when more tasks 
sharing the same Tcp connection




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-28 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-28 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , thank you for merging and help. If there are any following 
works to do related with TaskManager, you can assign to me and I am willing to 
do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2016-11-08 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2770

[FLINK-4354]Implement TaskManager side of heartbeat from ResourceManager

When TaskManager registers at the new ResourceManager, the SlotReport will 
be attached in the message. In heartbeat process, it is no need to exchange 
SlotReport between TaskManager and ResourceManager, so the payload is null in 
heartbeat message.

When TaskManager' listener is notified of heartbeat timeout from 
ResourceManager, currently it does nothing in the event notification. And it 
will re-register the new ResourceManager by HA mechanism.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4354

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2770


commit 233e63a8b92c68fc3aeeb7feae70c8e97a3e435e
Author: 淘江 
Date:   2016-11-08T10:16:00Z

[FLINK-4354]Implement TaskManager side of heartbeat from ResourceManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2798: [FLINK-4364]Implement TaskManager side of heartbea...

2016-11-14 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2798

[FLINK-4364]Implement TaskManager side of heartbeat from JobManager

1. **TaskManagerRunner** creates **HeartbeatManagerImpl** components for 
monitoring **JobManager** and **ResourceManager** separately when constructs 
the **TaskManager**.
2. **TaskManager** begins to monitor the **JobManager** when registration 
at new job leader successful.
3. Currently the payload will be null for interaction between 
**TaskManager** and **JobManager**, it can be expanded if needed later.
4. The **JobManager** side implementation will be realized in another jira.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2798.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2798


commit ae93a8fa3f29a1d00c1864575f030f0177cf73f1
Author: 淘江 
Date:   2016-11-14T09:54:58Z

[FLINK-4364]Implement TaskManager side of heartbeat from JobManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2906: [FLINK-5132][FLINK-5133][FLINK-5134][Core,DataStre...

2016-11-30 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2906

[FLINK-5132][FLINK-5133][FLINK-5134][Core,DataStream API,DataSet API] 
Fine-grained Resource Configuration

Introduce the **ResourceSpec** structure to describe the different resource 
factors.
Add  new **setResource**  API for both **DataStream** and **DataSet**.
In the internal construction of **StreamGraph** and **JobGraph** for 
DataStream, set the corresponding resource and merge resource for the 
**JobVertex**, and the similar way in **Plan**, **OptimizedPlan** for 
**DataSet**.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink jira-5132

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2906


commit 73e7cca3b3540b877bf1c03d18fc77d6b69dc867
Author: 淘江 
Date:   2016-11-28T10:07:15Z

[FLINK-5132][FLINK-5133][FLINK-5134][Core,DataStream API,DataSet API] 
Fine-grained Resource Configuration

Summary: above

Test Plan: NA

Reviewers: yushi.wxg

Subscribers: #blink

Differential Revision: http://phabricator.taobao.net/D6593

commit 02b0e9d71db86921485ee27dbbb588f6c48fd19e
Author: 淘江 
Date:   2016-11-30T09:13:32Z

[FLINK-5132][DataSet API,DataStream API] Introduce the ResourceSpec API for 
DataSet and DataStream

commit ceed4cb5a9798fb628e82511c08b70636acae9b0
Author: 淘江 
Date:   2016-11-30T09:43:22Z

[FLINK-5132][DataSet API,DataStream API] Introduce the ResourceSpec API for 
DataSet and DataStream




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2907: [FLINK-5135][JM]ResourceProfile for slot request s...

2016-11-30 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2907

[FLINK-5135][JM]ResourceProfile for slot request should be expanded to 
correspond with ResourceSpec

Currently the **ResourceProfile** only contains cpu cores and memory 
properties. The memory should be expanded to different types such as heap 
memory, direct memory and native memory which corresponds with memory in 
**ResourceSpec**.

It contains the related un-merging codes for [FLINK-5132] [FLINK-5133] 
[FLINK-5134]. And the modifications for [FLINK-5135] only includes **SlotPool** 
and **ResourceProfile**.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink jira-5135

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2907


commit 73e7cca3b3540b877bf1c03d18fc77d6b69dc867
Author: 淘江 
Date:   2016-11-28T10:07:15Z

[FLINK-5132][FLINK-5133][FLINK-5134][Core,DataStream API,DataSet API] 
Fine-grained Resource Configuration

Summary: above

Test Plan: NA

Reviewers: yushi.wxg

Subscribers: #blink

Differential Revision: http://phabricator.taobao.net/D6593

commit 02b0e9d71db86921485ee27dbbb588f6c48fd19e
Author: 淘江 
Date:   2016-11-30T09:13:32Z

[FLINK-5132][DataSet API,DataStream API] Introduce the ResourceSpec API for 
DataSet and DataStream

commit ceed4cb5a9798fb628e82511c08b70636acae9b0
Author: 淘江 
Date:   2016-11-30T09:43:22Z

[FLINK-5132][DataSet API,DataStream API] Introduce the ResourceSpec API for 
DataSet and DataStream

commit 6021b056f4de8bb2e74ff6709326b58e69547404
Author: 淘江 
Date:   2016-11-30T09:55:10Z

[FLINK-5132][DataSet API,DataStream API] Introduce the ResourceSpec API for 
DataSet and DataStream

commit 3ccd36033ff9b55c5f7ec7b4afbbefd261c85d61
Author: 淘江 
Date:   2016-11-30T10:01:16Z

[FLINK-5135][JM]ResourceProfile for slot request should be expanded to 
correspond with ResourceSpec




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-21 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2400

[FLINK-4363] Implement TaskManager basic startup of all components in java

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4363

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2400


commit afc504f37ec873b3be30320c3bcbc93fcbe1aec3
Author: 淘江 
Date:   2016-08-19T09:32:10Z

implement taskmanager startup basic components in java [#FLINK-4363]

commit bba0f679d3b2f7b779c18b13fefd4b50edbdfb69
Author: 淘江 
Date:   2016-08-22T03:13:02Z

Merge branch 'flip-6' of https://github.com/apache/flink into 
flink-tm_startup

commit 22da4e24e155f309d2705bbb5a6d1a64744dc4e3
Author: 淘江 
Date:   2016-08-22T03:52:05Z

[FLINK-4363] Implement TaskManager basic startup of all components in java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75649383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
--- End diff --

It is better to 

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75650354
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75651298
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75651382
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75652404
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
--- End diff --

Yes, the initial reason for this is that all the inner names still called 
"TaskManager", so the TaskExecutor class name can easily be be renamed to 
"TaskManager" to replace it in final. Otherwise we should replace all the 
"taskexecutor" into "taskmanager" after final replace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2400
  
Thank you for your review and advice, mxm. Another concern is whether to 
modify the parameters in the methods "selectNetworkInterfaceAndRunTaskManager" 
and "startTaskManagerComponentsAndActor" which will be invoked by 
"YarnTMRunner" and "LocalFlinkMiniCluster". The main differences for new TM are 
"RPCService" and "HighAvailabilityServices" instead of "ActorSystem" and 
"LeaderRetrievalService". My current implementation is only supporting these 
new components in constructor for easy test and retain the previous methods and 
parameters.  Should we further modify the related parts with "ActorSystem" and 
"LeaderRetrievalService" next or just like this currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75805009
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-23 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2400
  
Hi @mxm , I pushed some modifications based on your comments, including:
1. Make var as final
2. Implement TaskExecutorConfiguration class in java instead of 
TaskManagerConfiguration in scala, the NetworkEnvironmentConfiguration should 
also be rewrote by java, but it will cause import mistake in 
NetworkEnvironment, so retain the current implementation.
3. Replace scala tuple2 by InetAddress, and the scala 
tuple4 by org.apache.flink.api.java.tuple.Tuple4
4. Remove throws in checkConfigParameter method
5. There are two public static methods  
"selectNetworkInterfaceAndRunTaskManager" and 
"startTaskManagerComponentsAndActor" that can be invoked by outside world. For 
startTaskManagerComponentsAndActor method, the parameters should pass 
"RPCService" and  "HighAvailabilityServices" for constructing "TaskExecutor" 
directly. For "startTaskManagerComponentsAndActor" method, we will generate the 
default "RPCService" and "HighAvailabilityServices" or based on configuration. 
6. Fix the missing ResourceID parameter description.

Thank you for further suggestions!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76189334
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String 
newLeaderAddress, UUID newLe
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-   
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
--- End diff --

The RPC methods and start method have been put at the top of the class 
after TaskExecutor constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76189450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-25 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76189882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-25 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76190061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+  

[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-25 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2400
  
@mxm , could we remove the parameter "taskManagerHostname" from 
"startTaskManagerComponentsAndActor" method, I think the hostname can be got by 
{{TaskExecutor}} itself, no need to pass it from outsider world. To do so, it 
is more easier to construct {{TaskExecutor}} by factory method 
"startTaskManagerComponentsAndActor(configuration, 
resourceID,rpcService,haServices,localTaskManagerCommunication)" when design UT 
for testing.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-25 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76213620
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-25 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76367829
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-29 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2400
  
@mxm, I already push my changes and solve the conflicts based on the new 
flip-6 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-29 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2400
  
@mxm , I have created the FLINK-4505 for TaskExecutorFactory issue. As you 
mentioned, it should be an abstract class and provide an abstract method maybe 
called 'createAndStartTaskExecutor()'. There may be at least three different 
specific factories(testing, yarn, standalone) extend TaskExecutorFactory to 
implement the method 'createAndStartTaskExecutor'. The parameters in 
constructor of specific factories are different based on different modes. For 
example: for StandaloneTaskExecutorFactory, the constructor parameter should be 
(Configuration configuration,ResourceID resourceID,
RpcService rpcService,String taskManagerHostname,HighAvailabilityServices 
haServices,boolean localTaskManagerCommunication), and in the 
'createAndStartTaskExecutor()' method it can invoke 
‘startTaskManagerComponentsAndActor' method in TaskExecutor to bring up 
TaskExecutor. Do you have any other advices, then I can start this subtask 
later.
https://issues.apache.org/jira/browse/FLINK-4505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-31 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2400


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-02 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/2461

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to brin…

Implement TaskExecutorFactory that should be an abstract class with the 
helper methods to bring up the TaskManager. The factory can be implemented by 
some classes to start a TaskManager in different modes (testing, standalone, 
yarn).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4505

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2461


commit 21c4b408bb0138b41e49a439f6d07f9a0ca9
Author: 淘江 
Date:   2016-09-02T10:00:49Z

[FLINK-4505][Cluster Management] Implement TaskManagerFactory to bring up 
TaskManager for different modes

Summary: above

Test Plan: NA

Reviewers: kete.yangkt

Subscribers: #blink

Differential Revision: http://phabricator.taobao.net/D5606




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@mxm ,  The {{StandaloneTaskExecutorFactory}} can be used for mini cluster 
or testing mode I think, and the {{YarnTaskExecutorFactory}} used for yarn 
mode. After you confirm the implementation, I can add some testings for the 
factory if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2461#discussion_r77329068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java
 ---
@@ -0,0 +1,198 @@
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An factory for creating {@link TaskExecutor} and starting it in yarn 
mode.
+ */
+public class YarnTaskExecutorFactory extends TaskExecutorFactory {
+
+   public YarnTaskExecutorFactory(Configuration configuration, ResourceID 
resourceID) {
+   super(configuration, resourceID);
+   }
+
+   @Override
+   public TaskExecutor createAndStartTaskExecutor() throws Exception {
+   return selectNetworkInterfaceAndRunTaskManager(configuration, 
resourceID);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
--- End diff --

The parameters provided from different modes are different as I referred to 
previous implementation for **TaskManager**. For example, 
**YarnTaskManagerRunner** invokes the method 
"selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster** 
invokes the method "startTaskManagerComponentsAndActor" to start 
**TaskManager** before. So I retained the previous ways for different modes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , the current implementation is just follow the previous 
process for yarn and standalone modes. And I agree your opinion and actually it 
is not very clear for current ways. I think first we should re-define and 
confirm the parameters passed to the "createTaskManager" method for different 
factories, then I can fix the inner processes based on the input parameters. 
Would you suggest the specific parameters passed for different factories?  
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-04 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , I tried to understand your idea as follows:
1. Provide specific TaskExecutorFactory class instead of abstract factory 
for both standalone/yarn mode.
2. Network selection and RPC service creation methods should be pulled out 
from factory, maybe remove to some utility class?(for JM reuse)
3. The constructor of TaskExecutorFactory supports many different 
parameters: such as 
TaskExecutorFactory(Configuration, ResourceID)
TaskExecutorFactory(RpcService, HighAvailabilityServices)
TaskExecutorFactory(hostname, port)
TaskExecutorFactory(Configuration, ResourceID, RpcService, 
HighAvailabilityServices,hostname, port), 
The above three constructors for partial parameters can be transformed into 
the fourth final constructor , all the missing parameters can be generated by 
internal default value.
4. The TaskExecutorFactory supports the method "createTaskManger" to bring 
up TaskManger for outside  world, and this method will construct the related 
components(TaskManagerConfiguration, NetworkManager, IOManager, MemoryManager)
5. For testing mode, construct the TestingTaskExecutorFactory to pass all 
the components explicitly, including 
(ResourceID,MemoryManager,IOManager,NetworkEnvironment,numberOfSlots, 
RpcService,HighAvailabilityServices), the TaskManagerConfiguration should be 
passed from outside or generate implicitly?
6. In addition, the localTaskManagerCommunication parameter is needed 
before to decide whether to create NettyConfig for standalone or yarn mode. Now 
I will remove this parameter to create ~~NettyConfig~~ always.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-05 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
Yes, that also make sense. For testing purpose it is very clear, so we do 
not need do anything currently, all the components in `TaskManager` constructor 
can be mocked implicitly.

For `TaskManagerRunner`, the purpose is to pull out the initialization of 
related components from `TaskManager` to make it logic clear. Just one issue to 
be confirmed, we should provide more static methods of different parameter 
units for outside world or just one static method such as 
'selectNetworkInterfaceAndRunTaskManager(`Configuration` 
configuration,`ResourceID` resourceID) '? 
I think providing more methods with different parameters may be reasonable, 
because some parameters such as 'hostname','port', `RpcService`, 
`HighAvailabilityServices` may want to be passed from outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
@tillrohrmann , I created the `TaskExecutorRunner` for constructing the 
related components for starting `TaskExecutor`, and removed the factory class. 
Currently only the `ResourceID` and 'Configuration` parameters must be passed 
to the method `startComponents`, other parameters are optional. After you 
confirm the way , I will write some testings based on it. 
The `MemoryLogger` is removed from `TaskExecutorRunner` temporarily. The 
`ActorGateway` should not be passed to the constructor of `MemoryLogger` 
directly I think. If you agree, I will re-structure the `MemoryLogger` in 
another jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3113: [FLINK-4912] Introduce RECONCILIATING state in Exe...

2017-01-13 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3113

[FLINK-4912] Introduce RECONCILIATING state in ExecutionGraph and Exe…

This is part of the non-disruptive JobManager failure recovery.

Add a JobStatus and ExecutionState {{RECONCILING}}.
If a job is started on a JobManager for master recovery, the job status 
with all the executions transition to {{RECONCILING}} state.

From {{RECONCILING}}, execution can go to any existing task states 
(execution reconciled with TaskManager).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-4912

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3113


commit 0fbd628b9b8817fd1b71faca92d87c56213d79f6
Author: 淘江 
Date:   2017-01-13T08:41:37Z

[FLINK-4912] Introduce RECONCILIATING state in ExecutionGraph and Execution 
for JobManager failure recovery




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2907: [FLINK-5135][JM]ResourceProfile for slot request s...

2017-01-13 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2907


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2906: [FLINK-5132][FLINK-5133][FLINK-5134][Core,DataStre...

2017-01-13 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2906


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3114: [FLINK-5132]Introduce the ResourceSpec for groupin...

2017-01-13 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3114

[FLINK-5132]Introduce the ResourceSpec for grouping different resourc…

This is part of the fine-grained resource configuration.

The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The **ResourceSpec** will provide some basic constructions for grouping 
different resource factors as needed and the construction can also be expanded 
easily for further requirements.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-5132

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3114


commit 8689e0d4f81de9f2b704d425aa56ca3f5e792f29
Author: 淘江 
Date:   2017-01-13T10:40:47Z

[FLINK-5132]Introduce the ResourceSpec for grouping different resource 
factors in API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3113: [FLINK-4912] Introduce RECONCILIATING state in Exe...

2017-01-13 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3113#discussion_r95975088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
 ---
@@ -25,16 +25,23 @@
  * {@code
  *
  * CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
- * ||  |
- * ||   +--+
- * |V   V
- * | CANCELLING -+> CANCELED
- * | |
- * +-+
+ *| ||  |
+ *| ||   +--+
+ *| |V   V
+ *| | CANCELLING -+> CANCELED
+ *| | |
+ *|+-+
+ *|
+ *|   ... -> FAILED
+ *   V
+ *RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED
  *
- *   ... -> FAILED
  * }
  *
+ * It is possible to enter the {@code RECONCILING} state from {@code 
CREATED}
--- End diff --

Thank you for suggestions of the format. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3125: [FLINK-5499][JobManager]Reuse the resource locatio...

2017-01-16 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3125

[FLINK-5499][JobManager]Reuse the resource location of prior executio…

Currently when schedule execution to request to allocate slot from 
**SlotPool**, the **TaskManagerLocation** parameter is empty collection. So for 
task fail over scenario, the new execution attempt may be deployed to different 
task managers. If setting rockDB as state backend, the performance is better if 
the data can be restored from local machine. So we try to reuse the 
**TaskManagerLocation** of prior execution attempt when allocating slot from 
**SlotPool**. If the **TaskManagerLocation** is empty from prior executions, 
the behavior is the same with current status.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-5499

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3125


commit ab2e24ae7e82be45359f249670f72664226ec18c
Author: 淘江 
Date:   2017-01-16T09:28:19Z

[FLINK-5499][JobManager]Reuse the resource location of prior execution 
attempt in allocating slot




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-01-18 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3151

[FLINK-4364][runtime]mplement TaskManager side of heartbeat from JobM…

1. **TaskManagerRunner** creates **HeartbeatManagerImpl** components when 
constructs the **TaskManager**.
2. **TaskManager** begins to monitor the JobManager when registration at 
new job leader successful.
3. Currently the payload will be null for interaction between 
**TaskManager** and **JobManager**, it can be expanded if needed later. Also 
the logic of heartbeat timeout is not implemented yet, and maybe realized in 
failover detection issue.
4. The **JobManager** side implementation will be realized in another jira.

@tillrohrmann  for review please!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-4364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3151


commit 362f717b0ec50d8cc864d2f2efd432a5efbfd24a
Author: 淘江 
Date:   2017-01-18T11:08:19Z

[FLINK-4364][runtime]mplement TaskManager side of heartbeat from JobManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2798: [FLINK-4364]Implement TaskManager side of heartbea...

2017-01-18 Thread wangzhijiang999
Github user wangzhijiang999 closed the pull request at:

https://github.com/apache/flink/pull/2798


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...

2017-02-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3113
  
@StephanEwen , thank you for the concrete suggestions. Sorry for delay 
response because of Chinese Spring Festival Holiday.

I have considered and added some tests to validate the state transitions of 
the state machine related with the later processes which would be submitted in 
the following PRs together.

I totally agree with the consideration of the above possible state 
transitions. And I plan to give a detail explanation of my implementation in 
another jira soon. It is actually a bit complex to do that ,so I try to break 
them down into small ones in order to review and merge quickly.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3125: [FLINK-5499][JobManager]Reuse the resource location of pr...

2017-02-02 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3125
  
@StephanEwen 
Yes, the current concern is only focusing on state restore performance. 
This PR does not consider all the scenarios and it may be only the first step 
for the slot location implementation.

If the location do not exist,  it can add other strategies to decide the 
locations, such as co-loated by input for batch job as you mentioned. And it 
can be the second step for the implementation.

Wish your further comments!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3113: [FLINK-4912] Introduce RECONCILIATING state in ExecutionG...

2017-02-03 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3113
  
@StephanEwen , I already created #5703 for further detail recovery process 
and it may cover your considerations. Wish your further response, thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99731288
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId,
@RpcTimeout final Time timeout);
+
+   /**
+* Send the heartbeat to job manager from task manager
+*
+* @param resourceID unique id of the task manager
+* @param payload the payload information from the task manager
+*/
+   void heartbeatFromTaskManager(final ResourceID resourceID, final Object 
payload);
--- End diff --

Currently we have not considered which specific payload information should 
be attached with the heartbeat, so use the object to work around.  It should be 
changed to specific type if confirmation. Do you think which payload should be 
attached necessary currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
--- End diff --

From TaskExecutor side, it will monitor the JM and RM that initiate the 
heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or 
RM, the current PR just shows the interaction with JM part. I am supposed to 
submit the RM part in another PR, maybe submit both sides together will help to 
understand easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732610
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
+
+   JMRMHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+   log.info("Notify heartbeat timeout for resourceID {}", 
resourceID);
--- End diff --

Yes, it actually should trigger some actions with timeout. Currently I did 
not submit this part because I think it is related with failure detection logic 
 and supposed to submit in another PR. To make the heartbeat mechanism 
complete, I will add this part in the following modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99733319
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -112,6 +114,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   
taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

I agree with that, and initially I want to reuse the executor in 
**RPCService** but it needs some modifications. I will introduce it in Runner 
in order to share among components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99734627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID 
jobId, JobMasterGateway jobMast
jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
}
 
+   
heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new 
HeartbeatTarget() {
+   @Override
+   public void sendHeartbeat(ResourceID resourceID, Object 
payload) {
+   
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
+   }
+
+   @Override
+   public void requestHeartbeat(ResourceID resourceID, 
Object payload) {
+   throw new UnsupportedOperationException("Should 
never call requestHeartbeat in task manager.");
--- End diff --

My previous understanding is that the heartbeat is only requested from RM 
and JM to TM, and the TM will only response the heartbeat. Do you mean the 
heartbeat can both request from both sides? If to do so, the TM also needs to 
schedule a heartbeat request at interval time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann . Thank you for detail review and comments!

This PR just submit the heartbeat logic in TM side, because there is 
already a jira of JM heartbeat side.

For my implementation, the JM initiates the heartbeat with 
**HeartbeatManagerSenderImpl** and the TM responses the heartbeat with 
**HeartbeatManagerImpl**. So the heartbeat process is one-way.

I think it is better to submit the JM heartbeat logic in this PR in order 
to understand easily. I will modify this PR soon, and for testing there already 
exists the UT for basic heartbeat logic. Do you mean to add some ITCases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99736386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -126,6 +129,9 @@
/** The metric registry in the task manager */
private final MetricRegistry metricRegistry;
 
+   /** The heartbeat manager for job manager and resource manager in the 
task manager */
+   private final HeartbeatManagerImpl heartbeatManager;
--- End diff --

Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not 
wrapper it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99736682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse 
registrationResponse, Thr
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID resourceID,
-   final SlotReport slotReport) {
+   final SlotReport slotReport)
+   {
--- End diff --

It may be a misoperation, I will be careful next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -137,6 +140,7 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
+   this.resourceID = ResourceID.generate();
--- End diff --

Yes, in the current implementation it is determined by 
**ResourceManagerRunner**.This PR is submitted long time ago and some dependent 
work is not complete at that time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -122,4 +122,11 @@ void notifySlotAvailable(
 * @param optionalDiagnostics
 */
void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+   /**
+* sends the heartbeat to resource manager from task manager
+* @param resourceID unique id of the task manager
+* @param payload the payload information of the task manager
+*/
+   void sendHeartbeatFromTaskManager(final ResourceID resourceID, final 
Object payload);
--- End diff --

agree with it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1099,4 +1121,23 @@ public void run() {
});
}
}
+
+   private final class ResourceManagerHeartbeatListener implements 
HeartbeatListener {
+
+   ResourceManagerHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(ResourceID resourceID) {
+   }
--- End diff --

It will be added in the new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737255
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -99,6 +101,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

It is already added in new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2770: [FLINK-4354]Implement TaskManager side of heartbeat from ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2770
  
@tillrohrmann , thank you for review!

Actually this PR is out-of-date based on flip-6, and my following 
modifications do not update this PR. This PR should be re-submitted based on 
master and I forgot to close it.

Some above suggestions have already  been fixed in the current 
implementation, and I want to merge this part in #3151 in order to be reviewed 
easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-08 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r100030725
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID 
jobId, JobMasterGateway jobMast
jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
}
 
+   
heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new 
HeartbeatTarget() {
+   @Override
+   public void sendHeartbeat(ResourceID resourceID, Object 
payload) {
+   
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
+   }
+
+   @Override
+   public void requestHeartbeat(ResourceID resourceID, 
Object payload) {
+   throw new UnsupportedOperationException("Should 
never call requestHeartbeat in task manager.");
--- End diff --

If I understand correctly the TM can request the heartbeat to itself and 
the JM also can send the heartbeat to itself? 
If it is true, the **requestHeartbeat** will never be called from current 
logic in TM side. Or I can just remove the **UnsupportedOperationException** to 
not limitation and leave this method empty currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-08 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r100031031
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -112,6 +114,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   
taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

Do you have further suggestions where to introduce the 
**ScheduledExecutorService** seems suitable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

2017-02-08 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann , I am preparing the testing code and can submit the updates 
this week. Thank you for continuous help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-08 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r100222823
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -112,6 +114,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   
taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

Yes, I totally agree with that and the advantages can avoid bringing extra 
thread pool and run all the PRC messages in the uniform executor if 
**RPCService** can handle well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

2017-02-10 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann , I have submitted the updates that may cover your 
suggestions.

There are still two issues that I am not confirmed.

First, for heartbeat interval and timeout default values in 
**ConfigConstants**, they are not invalid currently and you can modify it with 
your professional experience.

Second, the introduction of **ScheduledExecutorService** in **RPCService**, 
my initial idea is trying to use the existing scheduler in **RPCService**, but 
it can not be got from **AkkaRPCService** implementation. Another way is to 
replace the current **ScheduledExecutorService** parameter with **RPCService** 
in construction of **HeartbeatManagerSenderImpl**, and the **RPCService** can 
also schedule the heartbeat request. But the return value of 
**scheduleRunnable** method in **RPCService** is conflict with that in 
**HeartbeatManagerSenderImpl**. So I just bring another single thread pool in 
**RPCService** for use currently. Maybe the number of threads in pool can refer 
to number of cpu cores.

Maybe there are still something to be polished, and I am willing for 
further modifications by your comments.
BTW, the heartbeat interaction between TM and RM will be submitted in 
another PR after this confirmation because of some common points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3303: [FLINK-5133][core] Add new setResource API for Dat...

2017-02-13 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3303

[FLINK-5133][core] Add new setResource API for DataStream and DataSet

This is part of the fine-grained resource configuration.
For **DataStream**, the **setResource** API will be setted onto 
**SingleOutputStreamOperator** similar with other existing properties like 
parallelism, name, etc.
For **DataSet**, the **setResource** API will be setted onto **Operator** 
in the similar way.
There are two parameters described with minimum **ResourceSpec** and 
maximum **ResourceSpec** separately in the API for considering dynamic resource 
resize in future improvements.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-5133

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3303.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3303


commit de7c4c75c9f1c80bf7e529a1ff05fa870c2df1cb
Author: 淘江 
Date:   2017-02-14T04:37:18Z

[FLINK-5133][core] Add new setResource API for DataStream and DataSet




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3303: [FLINK-5133][core] Add new setResource API for DataStream...

2017-02-13 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3303
  
@StephanEwen , this PR includes the new API that would be visible to user, 
but it can not work completely because the following codes in runtime have not 
been submitted. In order not to confuse users, this PR would be fixed to hide 
the API temporarily before merging into master. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3340: [FLINK-5703][runtime]Job manager failure recovery ...

2017-02-17 Thread wangzhijiang999
GitHub user wangzhijiang999 opened a pull request:

https://github.com/apache/flink/pull/3340

[FLINK-5703][runtime]Job manager failure recovery via reconciliation with 
TaskManager reports

This is part of [Non-disruptive JobManager Failures via Reconciliation 
](https://issues.apache.org/jira/browse/FLINK-4911). 

The design doc for this part is attached in the JIRA and it mainly contains 
the following work:

- **RECONCILING** state transition for job status and execution.
- The data structure recovery for **ExecutionGraph**, 
**ExecutionVertex**,**Execution** based on **TaskManager** reports. 

Two parts are left for the whole feature:

- The related modifications in **TaskManger** side will be submitted in 
another JIRA, including not cancel the tasks when notified job leader changed 
and report the task status to **JobManager**, etc.
- This PR will not make an effect on the current master branch, so it is 
safe to merge. The reconcile logic should be triggered by **JobManagerRunner** 
when grants leadership, but it is dependent on [Determine whether the job 
starts from last JobManager 
failure](https://issues.apache.org/jira/browse/FLINK-5501) , so I will modify 
the **JobManagerRunner** logic after 
[FLINK-5501](https://issues.apache.org/jira/browse/FLINK-5501) is merged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangzhijiang999/flink FLINK-5703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3340.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3340


commit 7337754bd8d12808a328b55e184b0fe59cf6e11d
Author: 淘江 
Date:   2017-02-17T08:41:44Z

[FLINK-5703][runtime]Job manager failure recovery based on reconciliation 
with TaskManager reports




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3340: [FLINK-5703][runtime]Job manager failure recovery via rec...

2017-02-17 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3340
  
@StephanEwen , wish your reviews. Thank you for any comments!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---