[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-22 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77757943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
 
 
/**
-*
-* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+* Register a taskExecutor at the resource manager
+* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
 * @param taskExecutorAddress  The address of the TaskExecutor that 
registers
 * @param resourceID   The resource ID of the TaskExecutor 
that registers
 *
 * @return The response by the ResourceManager.
 */
@RpcMethod
-   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
-   UUID resourceManagerLeaderId,
-   String taskExecutorAddress,
-   ResourceID resourceID) {
+   public Future registerTaskExecutor(
+   final UUID resourceManagerLeaderId,
+   final String taskExecutorAddress,
+   final ResourceID resourceID) {
+
+   if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+   log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+   resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+   return Futures.failed(new 
UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
--- End diff --

Do you prefer to send a Decline message under the condition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77339204
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerTest.java
 ---
@@ -0,0 +1,85 @@
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.rpc.UnmatchedLeaderSessionIDException;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerTest {
+
+   private TestingSerialRpcService rpcService;
+
+   @Before
+   public void setup() throws Exception {
+   rpcService = new TestingSerialRpcService();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   rpcService.stopService();
+   }
+
+   /**
+* Test registerTaskExecutor, including normal registration, 
registration with unmatched leadershipId,  registration with invalid address, 
duplicate registration
+* @throws Exception
+*/
+   @Test
+   public void testRegisterTaskExecutor() throws Exception {
+   String taskExecutorAddress = "/taskExecutor1";
+   TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+   ResourceID taskExecutorResourceID = ResourceID.generate();
+   rpcService.registerGateway(taskExecutorAddress, 
taskExecutorGateway);
+
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+   
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+   final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
+   resourceManager.start();
+   final UUID leaderSessionId = UUID.randomUUID();
+   leaderElectionService.isLeader(leaderSessionId);
+
+   // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
+   UUID differentLeaderSessionID = UUID.randomUUID();
+   Future unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
+   assertTrue(unMatchedLeaderFuture.isCompleted());
+   assertTrue(unMatchedLeaderFuture.failed().isCompleted());
+   assertTrue(unMatchedLeaderFuture.failed().value().get().get() 
instanceof UnmatchedLeaderSessionIDException);
--- End diff --

Why not simply doing `Await.result` and catching the 
`UnmatchedLeaderSessionIDException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77337149
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -53,14 +56,23 @@
  */
 public class ResourceManager extends RpcEndpoint {
private final Map jobMasterGateways;
+
+   /** ResourceID and TaskExecutorGateway mapping relationship of 
registered taskExecutors */
+   private final Map  
startedTaskExecutorGateways;
+
+   /** TaskExecutorGateway and InstanceId mapping relationship of 
registered taskExecutors */
+   private final Map taskExecutorGateways;
--- End diff --

Wouldn't it make sense to group the `TaskExecutorGateway` and the 
`InstanceID` into a `TaskExecutorRegistration` class which is stored under the 
resource ID? Then we would get rid of a lookup when accessing the `InstanceID` 
given the resource ID.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77337006
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
 
 
/**
-*
-* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+* Register a taskExecutor at the resource manager
+* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
 * @param taskExecutorAddress  The address of the TaskExecutor that 
registers
 * @param resourceID   The resource ID of the TaskExecutor 
that registers
 *
 * @return The response by the ResourceManager.
 */
@RpcMethod
-   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
-   UUID resourceManagerLeaderId,
-   String taskExecutorAddress,
-   ResourceID resourceID) {
+   public Future registerTaskExecutor(
+   final UUID resourceManagerLeaderId,
+   final String taskExecutorAddress,
+   final ResourceID resourceID) {
+
+   if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+   log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+   resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+   return Futures.failed(new 
UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
--- End diff --

Should we fail or decline the registration here? So either sending an 
exception or a `RegistrationResponse.Decline` message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77336702
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
 
 
/**
-*
-* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+* Register a taskExecutor at the resource manager
+* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
 * @param taskExecutorAddress  The address of the TaskExecutor that 
registers
 * @param resourceID   The resource ID of the TaskExecutor 
that registers
 *
 * @return The response by the ResourceManager.
 */
@RpcMethod
-   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
-   UUID resourceManagerLeaderId,
-   String taskExecutorAddress,
-   ResourceID resourceID) {
+   public Future registerTaskExecutor(
+   final UUID resourceManagerLeaderId,
+   final String taskExecutorAddress,
+   final ResourceID resourceID) {
+
+   if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+   log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+   resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+   return Futures.failed(new 
UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+   }
+
+   Future taskExecutorGatewayFuture = 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+
+   return taskExecutorGatewayFuture.map(new 
Mapper() {
 
-   return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
+   @Override
+   public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway) {
+   InstanceID instanceID = null;
+   
if(startedTaskExecutorGateways.containsKey(resourceID)) {
+   log.warn("Receive a duplicate 
registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+   instanceID = 
taskExecutorGateways.get(startedTaskExecutorGateways.get(resourceID));
+   } else {
+   
startedTaskExecutorGateways.put(resourceID, taskExecutorGateway);
+   instanceID = new InstanceID();
+   
taskExecutorGateways.put(taskExecutorGateway, instanceID);
+   }
--- End diff --

line break missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77336516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
 
 
/**
-*
-* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+* Register a taskExecutor at the resource manager
--- End diff --

Mixed camel and non-camel case. Furthermore, the `taskExecutor` is not the 
class name. Line break missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335982
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
+
+   private static final long serialVersionUID = -3276145308053264636L;
+
+   /** expected leader session id */
+   private final UUID expectedLeaderSessionID;
+
+   /** actual leader session id */
+   private final UUID actualLeaderSessionID;
+
+   public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, 
UUID actualLeaderSessionID) {
+   super("Unmatched leader session ID : expected " + 
expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+   this.expectedLeaderSessionID = expectedLeaderSessionID;
+   this.actualLeaderSessionID = actualLeaderSessionID;
+   }
+
+   /**
+* Get expect leader session id
--- End diff --

"expected"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
+
+   private static final long serialVersionUID = -3276145308053264636L;
+
+   /** expected leader session id */
+   private final UUID expectedLeaderSessionID;
+
+   /** actual leader session id */
+   private final UUID actualLeaderSessionID;
+
+   public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, 
UUID actualLeaderSessionID) {
+   super("Unmatched leader session ID : expected " + 
expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+   this.expectedLeaderSessionID = expectedLeaderSessionID;
--- End diff --

`checkNotNull`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335925
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
+ */
+public class UnmatchedLeaderSessionIDException extends Exception {
--- End diff --

I think it's ok to name this exception `LeaderSessionIDException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77335889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.util.UUID;
+
+/**
+ * An exception specifying that received leader session ID is not as same 
as expected.
--- End diff --

"that the received", "is not the same as"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-08-31 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2451

[FLINK-4535] [cluster management] resourceManager process the registration 
from TaskExecutor

This pull request is to implement ResourceManager registration with 
TaskExecutor, which including:
1. Check whether input resourceManagerLeaderId is as same as the current 
leadershipSessionId of resourceManager. If not, it means that maybe two or more 
resourceManager exists at the same time, and current resourceManager is not the 
proper rm. so it rejects or ignores the registration.
2. Check whether exists a valid taskExecutor at the giving address by 
connecting to the address. Reject the registration from invalid address. (which 
is hidden in the connect method)
3. Keep resourceID and taskExecutorGateway mapping relationships, And 
optionally keep resourceID and container mapping relationships in yarn mode.
4. Send registration successful ack to the taskExecutor.

Main difference are 3 points:
1. Add UnmatchedLeaderSessionIDException to specify that received leader 
session ID is not as same as expected.
2. Change registerTaskExecutor method  of ResourceManager
3. Add a test class for ResourceManager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4535

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2451


commit fa795ca7a992859398ed30180e50ef036a93b355
Author: beyond1920 
Date:   2016-09-01T03:14:00Z

resourceManager process the registration from TaskExecutor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---