zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r483462252



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -95,12 +97,15 @@
 
        private final Set<ExecutionVertexID> verticesWaitingForRestart;
 
+       private final Consumer<ComponentMainThreadExecutor> slartUpAction;

Review comment:
       `slartUpAction` -> `startUpAction `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
##########
@@ -62,11 +51,13 @@ default void start(ComponentMainThreadExecutor 
mainThreadExecutor) {
         * @param allocationTimeout after which the allocation fails with a 
timeout exception
         * @return The future of the allocation
         */
-       CompletableFuture<LogicalSlot> allocateSlot(
+       default CompletableFuture<LogicalSlot> allocateSlot(
                SlotRequestId slotRequestId,
                ScheduledUnit scheduledUnit,
                SlotProfile slotProfile,
-               Time allocationTimeout);
+               Time allocationTimeout) {

Review comment:
       Looks to me default body is not needed. I tried removing the default 
body and flink-runtime still compiles.
   Correct me if I miss anything.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
##########
@@ -125,24 +153,50 @@ static SchedulingStrategyFactory 
createSchedulingStrategyFactory(final ScheduleM
                }
        }
 
-       private static ExecutionSlotAllocatorFactory 
createExecutionSlotAllocatorFactory(
+       private static DefaultSchedulerComponents 
createPipelinedRegionSchedulerComponents(
                        final ScheduleMode scheduleMode,
-                       final SlotProvider slotProvider,
-                       final Time slotRequestTimeout,
-                       final SchedulingStrategyFactory 
schedulingStrategyFactory) {
-
-               if (schedulingStrategyFactory instanceof 
PipelinedRegionSchedulingStrategy.Factory) {
-                       return new OneSlotPerExecutionSlotAllocatorFactory(
-                               slotProvider,
-                               scheduleMode != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
-                               slotRequestTimeout);
-               } else {
-                       final SlotProviderStrategy slotProviderStrategy = 
SlotProviderStrategy.from(
-                               scheduleMode,
-                               slotProvider,
-                               slotRequestTimeout);
-
-                       return new 
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+                       final Configuration jobMasterConfiguration,
+                       final SlotPool slotPool,
+                       final Time slotRequestTimeout) {
+               final SlotSelectionStrategy slotSelectionStrategy = 
selectSlotSelectionStrategy(jobMasterConfiguration);
+               final BulkSlotProvider bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+               final ExecutionSlotAllocatorFactory allocatorFactory = new 
OneSlotPerExecutionSlotAllocatorFactory(
+                       bulkSlotProvider,
+                       scheduleMode != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                       slotRequestTimeout);
+               return new DefaultSchedulerComponents(
+                       new PipelinedRegionSchedulingStrategy.Factory(),
+                       bulkSlotProvider::start,
+                       allocatorFactory);
+       }
+
+       @Nonnull
+       private static SlotSelectionStrategy 
selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
+               final boolean evenlySpreadOutSlots = 
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+               final SlotSelectionStrategy 
locationPreferenceSlotSelectionStrategy;
+
+               locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
+                       
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
+                       LocationPreferenceSlotSelectionStrategy.createDefault();
+
+               return 
configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
+                       
PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy)
 :
+                       locationPreferenceSlotSelectionStrategy;
+       }
+
+       private static class DefaultSchedulerComponents {
+               private final SchedulingStrategyFactory 
schedulingStrategyFactory;
+               private final Consumer<ComponentMainThreadExecutor> 
slartUpAction;

Review comment:
       `startUpAction` -> `startUpAction`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
##########
@@ -125,24 +153,50 @@ static SchedulingStrategyFactory 
createSchedulingStrategyFactory(final ScheduleM
                }
        }
 
-       private static ExecutionSlotAllocatorFactory 
createExecutionSlotAllocatorFactory(
+       private static DefaultSchedulerComponents 
createPipelinedRegionSchedulerComponents(
                        final ScheduleMode scheduleMode,
-                       final SlotProvider slotProvider,
-                       final Time slotRequestTimeout,
-                       final SchedulingStrategyFactory 
schedulingStrategyFactory) {
-
-               if (schedulingStrategyFactory instanceof 
PipelinedRegionSchedulingStrategy.Factory) {
-                       return new OneSlotPerExecutionSlotAllocatorFactory(
-                               slotProvider,
-                               scheduleMode != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
-                               slotRequestTimeout);
-               } else {
-                       final SlotProviderStrategy slotProviderStrategy = 
SlotProviderStrategy.from(
-                               scheduleMode,
-                               slotProvider,
-                               slotRequestTimeout);
-
-                       return new 
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+                       final Configuration jobMasterConfiguration,
+                       final SlotPool slotPool,
+                       final Time slotRequestTimeout) {
+               final SlotSelectionStrategy slotSelectionStrategy = 
selectSlotSelectionStrategy(jobMasterConfiguration);
+               final BulkSlotProvider bulkSlotProvider = new 
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+               final ExecutionSlotAllocatorFactory allocatorFactory = new 
OneSlotPerExecutionSlotAllocatorFactory(
+                       bulkSlotProvider,
+                       scheduleMode != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                       slotRequestTimeout);
+               return new DefaultSchedulerComponents(
+                       new PipelinedRegionSchedulingStrategy.Factory(),
+                       bulkSlotProvider::start,
+                       allocatorFactory);
+       }
+
+       @Nonnull

Review comment:
       We can remove the `@Nonnull` annotation I think.
   This is because that we will by default assume it to be non-null, 
   and only `@Nullable` annotations are required when needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
##########
@@ -102,31 +93,19 @@ default void start(ComponentMainThreadExecutor 
mainThreadExecutor) {
                        allocationTimeout);
        }
 
-       /**
-        * Allocates a bulk of physical slots. The allocation will be completed
-        * normally only when all the requests are fulfilled.
-        *
-        * @param physicalSlotRequests requests for physical slots
-        * @param timeout indicating how long it is accepted that the slot 
requests can be unfulfillable
-        * @return future of the results of slot requests
-        */
-       default CompletableFuture<Collection<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
-               Collection<PhysicalSlotRequest> physicalSlotRequests,
-               Time timeout) {
-               throw new UnsupportedOperationException("Not properly 
implemented.");
-       }
-
        /**
         * Cancels the slot request with the given {@link SlotRequestId} and 
{@link SlotSharingGroupId}.
         *
         * @param slotRequestId identifying the slot request to cancel
         * @param slotSharingGroupId identifying the slot request to cancel
         * @param cause of the cancellation
         */
-       void cancelSlotRequest(
-               SlotRequestId slotRequestId,
-               @Nullable SlotSharingGroupId slotSharingGroupId,
-               Throwable cause);
+       default void cancelSlotRequest(

Review comment:
       Looks to me default body is not needed. I tried removing the default 
body and flink-runtime still compiles.
   Correct me if I miss anything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to