[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-25 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228148052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 ##
 @@ -312,7 +313,7 @@ public void validateRunsInMainThread() {
/**
 * Executor which executes runnables in the main thread context.
 */
-   protected static class MainThreadExecutor implements Executor {
+   protected static class MainThreadExecutor implements 
MainThreadExecutable {
 
 Review comment:
   I was also thinking about that. Are you suggesting to replace the current 
methods by the ones from `ScheduledExecutor` or to add them on top of the 
existing methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-25 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228147167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   In general, I like the idea, but I wonder how this is easily possible? For 
this methods, I agree but the interface also has `requestNewAllocatedSlot` 
which also exposes `AllocatedSlot` and typically is followed by attempts to 
assign payload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-25 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228147167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   In general, I like the idea, but I wonder how this is easily possible? For 
this methods, I agree but the interface also has `requestNewAllocatedSlot` 
which also exposes then interface and typically is followed by attempts to 
assign payload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-25 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228066391
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   In the end yes, absolutely right. But this is currently on purpose together 
with some of your other comments that there is still slot pool code around that 
could be removed. The purpose is you can switch back and forth now between the 
new and the old code path by changing a single line in the job master and to 
make the change as incremental as possible, as we previously discussed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227466414
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   In theory: it is a collection and contains on a collection can be expensive, 
depending on the implementation. A hash set offers ideal performance for 
contains.
   In practise, this collections is currently always of size 1, so either way, 
it should not matter much. But as the code could evolve, I prefer not to create 
a potential complexity bomb :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446372
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Will do. Overall this construct that uses `null` will go away once we 
introduce group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-10-23 Thread GitBox
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services