[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm closed the pull request at: https://github.com/apache/flink/pull/2463 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78212144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- Thank you for your comments @beyond1920. Your observations are correct. I've skipped this part of the implementation and wanted to address it next. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78194763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- These are valid points, I will change the code to use the `LeaderRetrievalListener` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77970827 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Good point. Will use that one instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77943543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- There is a log field in RpcEndpoint, which is protected, why not use that instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77787126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- The class has some docs but as you can see given my initial question, it's purpose was not clear to me. Yes, I actually thought about marking `leaderSessionID` `volatile`. Given the interface of this class every component which has a reference to this registry is allowed to change the leader session ID. This can be problematic because components other than the `ResourceManager` should only be allowed to retrieve the leader session ID. I'm actually wondering whether it is not necessary to notify the components about a new leader session ID. For example, the `SlotManager` should probably free its registered slots when it loses the leadership. Wouldn't these calls be suitable to transmit the current leader session ID? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77783497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- What exactly do you mean? The class is thread-safe and documented (though documentation can be improved). There is no need for locking. Do you mean marking the leaderSessionID `volatile`? It should be fine if leader changes propagate lazily. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77768256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77651399 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- But then this field should probably marked as `protected` instead of `private`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77651201 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't b
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650487 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { --- End diff -- Should extend `TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650617 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't b
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Alright, but then this class should be made thread safe and the docs should state the purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77630918 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't become rea
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77630351 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't become rea
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r7763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- As of now, this is just a stub but we will have to acknowledge the message. Will change the signature to make that clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77629858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- It comes with the SlotReport from the TaskExecutor. Yes, it breaks Serializable. Will change the code to contain the String address instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77628362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- In order to pass it on to components who want to retrieve the current leader UUID. Passing on only a single reference wouldn't work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77629249 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- The rationale here was to simply ignore this request because the JobManager is not registered. You're right, probably better to reply with a meaningful answer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77628600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- No particular reason other than I want to make sure future subclasses log with the correct class name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Why do you create a registry for a single field? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't b
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- The `SlotStatus` is no longer serializable with this field. Where does the `SlotStatus` come from? If it's coming from the `TaskExecutor`, then the `taskExecutorGateway` has to be retrieved on the `ResourceManager` side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- How is the confirmation of the `TaskExecutor` sent back to the `SlotManager`? Would it make sense to send it back via the return value of this method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't b
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- Not sure whether we should return `null` here, a negative `SlotRequestRegistered` response or throw an exception which will be handled by the caller. Why did you choose `null`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Why not making it static? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2463 [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol This implements and tests the ResourceManager part of the protocol for slot allocation. - associates JobMasters with JobID instead of InstanceID - adds TaskExecutorGateway to slot and notify from SlotManager - adds SlotManager as RM constructor parameter - adds LeaderIdRetriever to keep track of the leader id - tests the interaction JM->RM requestSlot - tests the interaction RM->TM requestSlot You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink flip-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2463.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2463 commit 213f4ee6a30bd87e9e04c5a4b22022e0636db9e9 Author: Maximilian Michels Date: 2016-09-01T14:53:31Z [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol - associates JobMasters with JobID instead of InstanceID - adds TaskExecutorGateway to slot - adds SlotManager as RM constructor parameter - adds LeaderIdRetriever to keep track of the leader id - tests the interaction JM->RM requestSlot - tests the interaction RM->TM requestSlot --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---