This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9aee0c0fb2121207bfdd7f824b61ff079b8ff884 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Fri Jun 5 18:42:43 2020 +0800 [FLINK-17017][runtime] SchedulerImpl supports bulk slot allocation --- .../runtime/jobmaster/slotpool/SchedulerImpl.java | 14 ++++++++ .../runtime/jobmaster/slotpool/SlotProvider.java | 38 +++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java index f7a518f..af87e9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java @@ -77,6 +77,8 @@ public class SchedulerImpl implements Scheduler { @Nonnull private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers; + private final BulkSlotProvider bulkSlotProvider; + public SchedulerImpl( @Nonnull SlotSelectionStrategy slotSelectionStrategy, @Nonnull SlotPool slotPool) { @@ -95,11 +97,15 @@ public class SchedulerImpl implements Scheduler { this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor( "Scheduler is not initialized with proper main thread executor. " + "Call to Scheduler.start(...) required."); + + this.bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool); } @Override public void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor) { this.componentMainThreadExecutor = mainThreadExecutor; + + bulkSlotProvider.start(mainThreadExecutor); } //--------------------------- @@ -560,4 +566,12 @@ public class SchedulerImpl implements Scheduler { public boolean requiresPreviousExecutionGraphAllocations() { return slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy; } + + @Override + public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots( + final Collection<PhysicalSlotRequest> physicalSlotRequests, + final Time timeout) { + + return bulkSlotProvider.allocatePhysicalSlots(physicalSlotRequests, timeout); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java index 36da2c7..747a6b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -27,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import javax.annotation.Nullable; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -40,7 +42,16 @@ import java.util.concurrent.CompletableFuture; * fulfilled as soon as a slot becomes available.</li> * </ul> */ -public interface SlotProvider { +public interface SlotProvider extends BulkSlotProvider { + + /** + * Starts the slot provider by initializing the main thread executor. + * + * @param mainThreadExecutor the main thread executor of the job master + */ + default void start(ComponentMainThreadExecutor mainThreadExecutor) { + throw new UnsupportedOperationException("Not properly implemented."); + } /** * Allocating slot with specific requirement. @@ -92,6 +103,20 @@ public interface SlotProvider { } /** + * Allocates a bulk of physical slots. The allocation will be completed + * normally only when all the requests are fulfilled. + * + * @param physicalSlotRequests requests for physical slots + * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable + * @return future of the results of slot requests + */ + default CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots( + Collection<PhysicalSlotRequest> physicalSlotRequests, + Time timeout) { + throw new UnsupportedOperationException("Not properly implemented."); + } + + /** * Cancels the slot request with the given {@link SlotRequestId} and {@link SlotSharingGroupId}. * * @param slotRequestId identifying the slot request to cancel @@ -102,4 +127,15 @@ public interface SlotProvider { SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause); + + /** + * Cancels the slot request with the given {@link SlotRequestId}. If the request is already fulfilled + * with a physical slot, the slot will be released. + * + * @param slotRequestId identifying the slot request to cancel + * @param cause of the cancellation + */ + default void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) { + cancelSlotRequest(slotRequestId, null, cause); + } }