[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-20 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/2463


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78212144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

Thank you for your comments @beyond1920. Your observations are correct. 
I've skipped this part of the implementation and wanted to address it next.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78194763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

These are valid points, I will change the code to use the 
`LeaderRetrievalListener` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-08 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77970827
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Good point. Will use that one instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77943543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

There is a log field in RpcEndpoint, which is protected, why not use that 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77787126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

The class has some docs but as you can see given my initial question, it's 
purpose was not clear to me.

Yes, I actually thought about marking `leaderSessionID` `volatile`. 

Given the interface of this class every component which has a reference to 
this registry is allowed to change the leader session ID. This can be 
problematic because components other than the `ResourceManager` should only be 
allowed to retrieve the leader session ID.

I'm actually wondering whether it is not necessary to notify the components 
about a new leader session ID. For example, the `SlotManager` should probably 
free its registered slots when it loses the leadership. Wouldn't these calls be 
suitable to transmit the current leader session ID? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77783497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

What exactly do you mean? The class is thread-safe and documented (though 
documentation can be improved). There is no need for locking. Do you mean 
marking the leaderSessionID `volatile`? It should be fine if leader changes 
propagate lazily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77769044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

There exists 3 following possibilities of the response from taskExecutor:
1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
2. Decline request if the slot is already occupied by other AllocationID. 
3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77768256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77651399
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

But then this field should probably marked as `protected` instead of 
`private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77651201
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
b

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650487
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
--- End diff --

Should extend `TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650617
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
b

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Alright, but then this class should be made thread safe and the docs should 
state the purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77630918
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
become rea

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77630351
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
become rea

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r7763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

As of now, this is just a stub but we will have to acknowledge the message. 
Will change the signature to make that clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77629858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

It comes with the SlotReport from the TaskExecutor. Yes, it breaks 
Serializable. Will change the code to contain the String address instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77628362
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

In order to pass it on to components who want to retrieve the current 
leader UUID. Passing on only a single reference wouldn't work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77629249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

The rationale here was to simply ignore this request because the JobManager 
is not registered. You're right, probably better to reply with a meaningful 
answer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-06 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77628600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

No particular reason other than I want to make sure future subclasses log 
with the correct class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Why do you create a registry for a single field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
b

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

The `SlotStatus` is no longer serializable with this field. Where does the 
`SlotStatus` come from? If it's coming from the `TaskExecutor`, then the 
`taskExecutorGateway` has to be retrieved on the `ResourceManager` side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

How is the confirmation of the `TaskExecutor` sent back to the 
`SlotManager`? Would it make sense to send it back via the return value of this 
method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 
b

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

Not sure whether we should return `null` here, a negative 
`SlotRequestRegistered` response or throw an exception which will be handled by 
the caller. Why did you choose `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Why not making it static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-02 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2463

[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

This implements and tests the ResourceManager part of the protocol for slot 
allocation.

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot and notify from SlotManager
- adds SlotManager as RM constructor parameter
- adds LeaderIdRetriever to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2463


commit 213f4ee6a30bd87e9e04c5a4b22022e0636db9e9
Author: Maximilian Michels 
Date:   2016-09-01T14:53:31Z

[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderIdRetriever to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---