This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9aee0c0fb2121207bfdd7f824b61ff079b8ff884
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Fri Jun 5 18:42:43 2020 +0800

    [FLINK-17017][runtime] SchedulerImpl supports bulk slot allocation
---
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  | 14 ++++++++
 .../runtime/jobmaster/slotpool/SlotProvider.java   | 38 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index f7a518f..af87e9c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -77,6 +77,8 @@ public class SchedulerImpl implements Scheduler {
        @Nonnull
        private final Map<SlotSharingGroupId, SlotSharingManager> 
slotSharingManagers;
 
+       private final BulkSlotProvider bulkSlotProvider;
+
        public SchedulerImpl(
                @Nonnull SlotSelectionStrategy slotSelectionStrategy,
                @Nonnull SlotPool slotPool) {
@@ -95,11 +97,15 @@ public class SchedulerImpl implements Scheduler {
                this.componentMainThreadExecutor = new 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
                        "Scheduler is not initialized with proper main thread 
executor. " +
                                "Call to Scheduler.start(...) required.");
+
+               this.bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
        }
 
        @Override
        public void start(@Nonnull ComponentMainThreadExecutor 
mainThreadExecutor) {
                this.componentMainThreadExecutor = mainThreadExecutor;
+
+               bulkSlotProvider.start(mainThreadExecutor);
        }
 
        //---------------------------
@@ -560,4 +566,12 @@ public class SchedulerImpl implements Scheduler {
        public boolean requiresPreviousExecutionGraphAllocations() {
                return slotSelectionStrategy instanceof 
PreviousAllocationSlotSelectionStrategy;
        }
+
+       @Override
+       public CompletableFuture<Collection<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+                       final Collection<PhysicalSlotRequest> 
physicalSlotRequests,
+                       final Time timeout) {
+
+               return 
bulkSlotProvider.allocatePhysicalSlots(physicalSlotRequests, timeout);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
index 36da2c7..747a6b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -27,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -40,7 +42,16 @@ import java.util.concurrent.CompletableFuture;
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
  */
-public interface SlotProvider {
+public interface SlotProvider extends BulkSlotProvider {
+
+       /**
+        * Starts the slot provider by initializing the main thread executor.
+        *
+        * @param mainThreadExecutor the main thread executor of the job master
+        */
+       default void start(ComponentMainThreadExecutor mainThreadExecutor) {
+               throw new UnsupportedOperationException("Not properly 
implemented.");
+       }
 
        /**
         * Allocating slot with specific requirement.
@@ -92,6 +103,20 @@ public interface SlotProvider {
        }
 
        /**
+        * Allocates a bulk of physical slots. The allocation will be completed
+        * normally only when all the requests are fulfilled.
+        *
+        * @param physicalSlotRequests requests for physical slots
+        * @param timeout indicating how long it is accepted that the slot 
requests can be unfulfillable
+        * @return future of the results of slot requests
+        */
+       default CompletableFuture<Collection<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+               Collection<PhysicalSlotRequest> physicalSlotRequests,
+               Time timeout) {
+               throw new UnsupportedOperationException("Not properly 
implemented.");
+       }
+
+       /**
         * Cancels the slot request with the given {@link SlotRequestId} and 
{@link SlotSharingGroupId}.
         *
         * @param slotRequestId identifying the slot request to cancel
@@ -102,4 +127,15 @@ public interface SlotProvider {
                SlotRequestId slotRequestId,
                @Nullable SlotSharingGroupId slotSharingGroupId,
                Throwable cause);
+
+       /**
+        * Cancels the slot request with the given {@link SlotRequestId}. If 
the request is already fulfilled
+        * with a physical slot, the slot will be released.
+        *
+        * @param slotRequestId identifying the slot request to cancel
+        * @param cause of the cancellation
+        */
+       default void cancelSlotRequest(SlotRequestId slotRequestId, Throwable 
cause) {
+               cancelSlotRequest(slotRequestId, null, cause);
+       }
 }

Reply via email to