[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5091


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156948978
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156902262
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
--- End diff --

True.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156902096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156902613
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
+
+   @Test
+   public void testRootSlotCreation() {
+   final SlotSharingManager slotSharingManager = new 
SlotSharingManager(
+   slotSharingGroupId,
+   allocatedSlotActions,
+   slotOwner);
+
+   SlotRequestId slotRequestId = new SlotRequestId();
+   SlotRequestId allocatedSlotRequestId = new SlotRequestId();
+
+   final SlotSharingManager.MultiTaskSlot multiTaskSlot = 
slotSharingManager.createRootSlot(
+   slotRequestId,
+   new CompletableFuture<>(),
+   allocatedSlotRequestId);
+
+   assertEquals(slotRequestId, multiTaskSlot.getSlotRequestId());
+   assertNotNull(slotSharingManager.getTaskSlot(slotRequestId));
+   }
+
+   @Test
+   public void testRootSlotRelease() throws ExecutionException, 
InterruptedException {
+   final CompletableFuture slotReleasedFuture = new 
CompletableFuture<>();
+   final TestingAllocatedSlotActions testingAllocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

Indeed. Will change it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156905291
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156903740
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
 ---
@@ -19,68 +19,104 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 public class ScheduledUnit {
-   
+
+   @Nullable
private final Execution vertexExecution;
-   
-   private final SlotSharingGroup sharingGroup;
-   
-   private final CoLocationConstraint locationConstraint;
+
+   private final JobVertexID jobVertexId;
+
+   @Nullable
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   @Nullable
+   private final CoLocationConstraint coLocationConstraint;

// 


public ScheduledUnit(Execution task) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = null;
-   this.locationConstraint = null;
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   null,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = sharingUnit;
-   this.locationConstraint = null;
+   public ScheduledUnit(Execution task, SlotSharingGroupId 
slotSharingGroupId) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, 
CoLocationConstraint locationConstraint) {
-   Preconditions.checkNotNull(task);
-   Preconditions.checkNotNull(sharingUnit);
-   Preconditions.checkNotNull(locationConstraint);
-   
+   public ScheduledUnit(
+   Execution task,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   JobVertexID jobVertexId,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   null,
+   jobVertexId,
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   Execution task,
+   JobVertexID jobVertexId,
--- End diff --

Yes because `task` can be null. Will add the missing `@Nullable` annotation.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156902956
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

Will make it non static.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156902071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
--- End diff --

good catch.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156901374
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156896571
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156892479
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

Good point. Will fix it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156892139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
--- End diff --

True, will rename it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156891235
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
--- End diff --

jup, we wouldn't want the single task slot to leave after all.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156886562
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Simple {@link AllocatedSlotActions} implementations for testing 
purposes.
+ */
+public class TestingAllocatedSlotActions implements AllocatedSlotActions {
+
+   private volatile Consumer> releaseSlotConsumer;
+
+   public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) {
+   this.releaseSlotConsumer = 
Preconditions.checkNotNull(releaseSlotConsumer);
+   }
+
+   @Override
+   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable 
Throwable cause) {
+   Consumer> 
currentReleaseSlotConsumer = this.releaseSlotConsumer;
+
+   if (currentReleaseSlotConsumer != null) {
+   
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, 
cause));
--- End diff --

Will fix it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156890924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156887983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
--- End diff --

Will correct it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156887632
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
+
+   }
+
+   scheduler.instanceDied(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i3);
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+   assertFalse(i1.isAlive());
+   assertFalse(i2.isAlive());
+   assertFalse(i3.isAlive());
+   }
+   catch (Exception e) {
--- End diff --

jup, but I'll remove it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156887866
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156888058
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
--- End diff --

good point. Will change it.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-13 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r156621784
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+   

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-13 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r154871284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
 ---
@@ -19,68 +19,104 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 public class ScheduledUnit {
-   
+
+   @Nullable
private final Execution vertexExecution;
-   
-   private final SlotSharingGroup sharingGroup;
-   
-   private final CoLocationConstraint locationConstraint;
+
+   private final JobVertexID jobVertexId;
+
+   @Nullable
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   @Nullable
+   private final CoLocationConstraint coLocationConstraint;

// 


public ScheduledUnit(Execution task) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = null;
-   this.locationConstraint = null;
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   null,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
-   Preconditions.checkNotNull(task);
-   
-   this.vertexExecution = task;
-   this.sharingGroup = sharingUnit;
-   this.locationConstraint = null;
+   public ScheduledUnit(Execution task, SlotSharingGroupId 
slotSharingGroupId) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   null);
}

-   public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, 
CoLocationConstraint locationConstraint) {
-   Preconditions.checkNotNull(task);
-   Preconditions.checkNotNull(sharingUnit);
-   Preconditions.checkNotNull(locationConstraint);
-   
+   public ScheduledUnit(
+   Execution task,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   Preconditions.checkNotNull(task),
+   task.getVertex().getJobvertexId(),
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   JobVertexID jobVertexId,
+   SlotSharingGroupId slotSharingGroupId,
+   CoLocationConstraint coLocationConstraint) {
+   this(
+   null,
+   jobVertexId,
+   slotSharingGroupId,
+   coLocationConstraint);
+   }
+
+   public ScheduledUnit(
+   Execution task,
+   JobVertexID jobVertexId,
--- End diff --

We can get JobVertexID from Execution. Do we need this in Constructor?


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770364
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

This instance is mutable... should not be `static`


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
--- End diff --

Should be `SLOT_SHARING_GROUP_ID` since it is a constant.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155768880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155758219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
--- End diff --

nit: *leaf nodes*


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155754738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+ 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155751694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+ 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155604499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
--- End diff --

The variable name is confusing. `multiTaskSlotFuture` is not of type 
`Future`.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155605251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155590317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
--- End diff --

nit: variable name should be *leaf* 

https://www.dict.cc/?s=leaf


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155549755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -32,6 +34,20 @@
  */
 public interface LogicalSlot {
 
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155519870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155528224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155520946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
--- End diff --

nit: All fields are commented with non-javadoc comments. Normally comments 
on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on 
fields are displayed by IntelliJ (`Ctrl + J`).


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
--- End diff --

nit: wrong import order (not sorted lexicographically)
```
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
```
items should appear before `LogicalSlot`


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507294
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
--- End diff --

😃 


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
+
+   }
+
+   scheduler.instanceDied(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i3);
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+   assertFalse(i1.isAlive());
+   assertFalse(i2.isAlive());
+   assertFalse(i3.isAlive());
+   }
+   catch (Exception e) {
--- End diff --

Better propagate the exception but I guess this file was copy pasted.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155502971
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Simple {@link AllocatedSlotActions} implementations for testing 
purposes.
+ */
+public class TestingAllocatedSlotActions implements AllocatedSlotActions {
+
+   private volatile Consumer> releaseSlotConsumer;
+
+   public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) {
+   this.releaseSlotConsumer = 
Preconditions.checkNotNull(releaseSlotConsumer);
+   }
+
+   @Override
+   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable 
Throwable cause) {
+   Consumer> 
currentReleaseSlotConsumer = this.releaseSlotConsumer;
+
+   if (currentReleaseSlotConsumer != null) {
+   
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, 
cause));
--- End diff --

nit: whitespace after `cause`
```
... cause   ));
```


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5091

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to 
SlotPool

## What is the purpose of the change

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

This PR also moves the `SlotPool` components to 
`o.a.f.runtime.jobmaster.slotpool`.

This PR is based on #5090 

## Brief change log

- Add `SlotSharingManager` to manage shared slots
- Rework `SlotPool` to use `SlotSharingManager`
- Add `SlotPool#allocateMultiTaskSlot` to allocate a shared slot
- Add `SlotPool#allocateCoLocatedMultiTaskSlot` to allocate a co-located 
slot
- Move `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool`

## Verifying this change

- Port `SchedulerSlotSharingTest`, `SchedulerIsolatedTasksTest` and
`ScheduleWithCoLocationHintTest` to run with `SlotPool`
- Add `SlotSharingManagerTest`, `SlotPoolSlotSharingTest` and
`SlotPoolCoLocationTest` 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink slotPoolSlots

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5091


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088]