[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228148052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ## @@ -312,7 +313,7 @@ public void validateRunsInMainThread() { /** * Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements Executor { + protected static class MainThreadExecutor implements MainThreadExecutable { Review comment: I was also thinking about that. Are you suggesting to replace the current methods by the ones from `ScheduledExecutor` or to add them on top of the existing methods? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228147167 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: In general, I like the idea, but I wonder how this is easily possible? For this methods, I agree but the interface also has `requestNewAllocatedSlot` which also exposes `AllocatedSlot` and typically is followed by attempts to assign payload. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228147167 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: In general, I like the idea, but I wonder how this is easily possible? For this methods, I agree but the interface also has `requestNewAllocatedSlot` which also exposes then interface and typically is followed by attempts to assign payload. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228066391 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: In the end yes, absolutely right. But this is currently on purpose together with some of your other comments that there is still slot pool code around that could be removed. The purpose is you can switch back and forth now between the new and the old code path by changing a single line in the job master and to make the change as incremental as possible, as we previously discussed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227466414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: In theory: it is a collection and contains on a collection can be expensive, depending on the implementation. A hash set offers ideal performance for contains. In practise, this collections is currently always of size 1, so either way, it should not matter much. But as the code could evolve, I prefer not to create a potential complexity bomb :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446372 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Will do. Overall this construct that uses `null` will go away once we introduce group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler
StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services