[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2451 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77757943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { /** -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader +* Register a taskExecutor at the resource manager +* @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param taskExecutorAddress The address of the TaskExecutor that registers * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) { + + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); --- End diff -- Do you prefer to send a Decline message under the condition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77339204 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerTest.java --- @@ -0,0 +1,85 @@ +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.rpc.UnmatchedLeaderSessionIDException; +import org.apache.flink.runtime.rpc.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class ResourceManagerTest { + + private TestingSerialRpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingSerialRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + /** +* Test registerTaskExecutor, including normal registration, registration with unmatched leadershipId, registration with invalid address, duplicate registration +* @throws Exception +*/ + @Test + public void testRegisterTaskExecutor() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + ResourceID taskExecutorResourceID = ResourceID.generate(); + rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway); + + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); + + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + final UUID leaderSessionId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderSessionId); + + // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); + assertTrue(unMatchedLeaderFuture.isCompleted()); + assertTrue(unMatchedLeaderFuture.failed().isCompleted()); + assertTrue(unMatchedLeaderFuture.failed().value().get().get() instanceof UnmatchedLeaderSessionIDException); --- End diff -- Why not simply doing `Await.result` and catching the `UnmatchedLeaderSessionIDException`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77337149 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -53,14 +56,23 @@ */ public class ResourceManager extends RpcEndpoint { private final Map jobMasterGateways; + + /** ResourceID and TaskExecutorGateway mapping relationship of registered taskExecutors */ + private final Map startedTaskExecutorGateways; + + /** TaskExecutorGateway and InstanceId mapping relationship of registered taskExecutors */ + private final Map taskExecutorGateways; --- End diff -- Wouldn't it make sense to group the `TaskExecutorGateway` and the `InstanceID` into a `TaskExecutorRegistration` class which is stored under the resource ID? Then we would get rid of a lookup when accessing the `InstanceID` given the resource ID. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77337006 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { /** -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader +* Register a taskExecutor at the resource manager +* @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param taskExecutorAddress The address of the TaskExecutor that registers * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) { + + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); --- End diff -- Should we fail or decline the registration here? So either sending an exception or a `RegistrationResponse.Decline` message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77336702 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { /** -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader +* Register a taskExecutor at the resource manager +* @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param taskExecutorAddress The address of the TaskExecutor that registers * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) { + + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); + } + + Future taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + + return taskExecutorGatewayFuture.map(new Mapper() { - return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); + @Override + public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) { + InstanceID instanceID = null; + if(startedTaskExecutorGateways.containsKey(resourceID)) { + log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); + instanceID = taskExecutorGateways.get(startedTaskExecutorGateways.get(resourceID)); + } else { + startedTaskExecutorGateways.put(resourceID, taskExecutorGateway); + instanceID = new InstanceID(); + taskExecutorGateways.put(taskExecutorGateway, instanceID); + } --- End diff -- line break missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77336516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { /** -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader +* Register a taskExecutor at the resource manager --- End diff -- Mixed camel and non-camel case. Furthermore, the `taskExecutor` is not the class name. Line break missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77335982 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.UUID; + +/** + * An exception specifying that received leader session ID is not as same as expected. + */ +public class UnmatchedLeaderSessionIDException extends Exception { + + private static final long serialVersionUID = -3276145308053264636L; + + /** expected leader session id */ + private final UUID expectedLeaderSessionID; + + /** actual leader session id */ + private final UUID actualLeaderSessionID; + + public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) { + super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID); + this.expectedLeaderSessionID = expectedLeaderSessionID; + this.actualLeaderSessionID = actualLeaderSessionID; + } + + /** +* Get expect leader session id --- End diff -- "expected" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77335961 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.UUID; + +/** + * An exception specifying that received leader session ID is not as same as expected. + */ +public class UnmatchedLeaderSessionIDException extends Exception { + + private static final long serialVersionUID = -3276145308053264636L; + + /** expected leader session id */ + private final UUID expectedLeaderSessionID; + + /** actual leader session id */ + private final UUID actualLeaderSessionID; + + public UnmatchedLeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) { + super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID); + this.expectedLeaderSessionID = expectedLeaderSessionID; --- End diff -- `checkNotNull` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77335925 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.UUID; + +/** + * An exception specifying that received leader session ID is not as same as expected. + */ +public class UnmatchedLeaderSessionIDException extends Exception { --- End diff -- I think it's ok to name this exception `LeaderSessionIDException` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77335889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/UnmatchedLeaderSessionIDException.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.util.UUID; + +/** + * An exception specifying that received leader session ID is not as same as expected. --- End diff -- "that the received", "is not the same as" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2451 [FLINK-4535] [cluster management] resourceManager process the registration from TaskExecutor This pull request is to implement ResourceManager registration with TaskExecutor, which including: 1. Check whether input resourceManagerLeaderId is as same as the current leadershipSessionId of resourceManager. If not, it means that maybe two or more resourceManager exists at the same time, and current resourceManager is not the proper rm. so it rejects or ignores the registration. 2. Check whether exists a valid taskExecutor at the giving address by connecting to the address. Reject the registration from invalid address. (which is hidden in the connect method) 3. Keep resourceID and taskExecutorGateway mapping relationships, And optionally keep resourceID and container mapping relationships in yarn mode. 4. Send registration successful ack to the taskExecutor. Main difference are 3 points: 1. Add UnmatchedLeaderSessionIDException to specify that received leader session ID is not as same as expected. 2. Change registerTaskExecutor method of ResourceManager 3. Add a test class for ResourceManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4535 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2451 commit fa795ca7a992859398ed30180e50ef036a93b355 Author: beyond1920 Date: 2016-09-01T03:14:00Z resourceManager process the registration from TaskExecutor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---