zhuzhurk commented on a change in pull request #13181: URL: https://github.com/apache/flink/pull/13181#discussion_r483462252
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ########## @@ -95,12 +97,15 @@ private final Set<ExecutionVertexID> verticesWaitingForRestart; + private final Consumer<ComponentMainThreadExecutor> slartUpAction; Review comment: `slartUpAction` -> `startUpAction ` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ########## @@ -62,11 +51,13 @@ default void start(ComponentMainThreadExecutor mainThreadExecutor) { * @param allocationTimeout after which the allocation fails with a timeout exception * @return The future of the allocation */ - CompletableFuture<LogicalSlot> allocateSlot( + default CompletableFuture<LogicalSlot> allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - Time allocationTimeout); + Time allocationTimeout) { Review comment: Looks to me default body is not needed. I tried removing the default body and flink-runtime still compiles. Correct me if I miss anything. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ########## @@ -125,24 +153,50 @@ static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleM } } - private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory( + private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents( final ScheduleMode scheduleMode, - final SlotProvider slotProvider, - final Time slotRequestTimeout, - final SchedulingStrategyFactory schedulingStrategyFactory) { - - if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) { - return new OneSlotPerExecutionSlotAllocatorFactory( - slotProvider, - scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, - slotRequestTimeout); - } else { - final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from( - scheduleMode, - slotProvider, - slotRequestTimeout); - - return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy); + final Configuration jobMasterConfiguration, + final SlotPool slotPool, + final Time slotRequestTimeout) { + final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration); + final BulkSlotProvider bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool); + final ExecutionSlotAllocatorFactory allocatorFactory = new OneSlotPerExecutionSlotAllocatorFactory( + bulkSlotProvider, + scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, + slotRequestTimeout); + return new DefaultSchedulerComponents( + new PipelinedRegionSchedulingStrategy.Factory(), + bulkSlotProvider::start, + allocatorFactory); + } + + @Nonnull + private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) { + final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); + + final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy; + + locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ? + LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() : + LocationPreferenceSlotSelectionStrategy.createDefault(); + + return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ? + PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) : + locationPreferenceSlotSelectionStrategy; + } + + private static class DefaultSchedulerComponents { + private final SchedulingStrategyFactory schedulingStrategyFactory; + private final Consumer<ComponentMainThreadExecutor> slartUpAction; Review comment: `startUpAction` -> `startUpAction` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java ########## @@ -125,24 +153,50 @@ static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleM } } - private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory( + private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents( final ScheduleMode scheduleMode, - final SlotProvider slotProvider, - final Time slotRequestTimeout, - final SchedulingStrategyFactory schedulingStrategyFactory) { - - if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) { - return new OneSlotPerExecutionSlotAllocatorFactory( - slotProvider, - scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, - slotRequestTimeout); - } else { - final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from( - scheduleMode, - slotProvider, - slotRequestTimeout); - - return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy); + final Configuration jobMasterConfiguration, + final SlotPool slotPool, + final Time slotRequestTimeout) { + final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration); + final BulkSlotProvider bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool); + final ExecutionSlotAllocatorFactory allocatorFactory = new OneSlotPerExecutionSlotAllocatorFactory( + bulkSlotProvider, + scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST, + slotRequestTimeout); + return new DefaultSchedulerComponents( + new PipelinedRegionSchedulingStrategy.Factory(), + bulkSlotProvider::start, + allocatorFactory); + } + + @Nonnull Review comment: We can remove the `@Nonnull` annotation I think. This is because that we will by default assume it to be non-null, and only `@Nullable` annotations are required when needed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ########## @@ -102,31 +93,19 @@ default void start(ComponentMainThreadExecutor mainThreadExecutor) { allocationTimeout); } - /** - * Allocates a bulk of physical slots. The allocation will be completed - * normally only when all the requests are fulfilled. - * - * @param physicalSlotRequests requests for physical slots - * @param timeout indicating how long it is accepted that the slot requests can be unfulfillable - * @return future of the results of slot requests - */ - default CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots( - Collection<PhysicalSlotRequest> physicalSlotRequests, - Time timeout) { - throw new UnsupportedOperationException("Not properly implemented."); - } - /** * Cancels the slot request with the given {@link SlotRequestId} and {@link SlotSharingGroupId}. * * @param slotRequestId identifying the slot request to cancel * @param slotSharingGroupId identifying the slot request to cancel * @param cause of the cancellation */ - void cancelSlotRequest( - SlotRequestId slotRequestId, - @Nullable SlotSharingGroupId slotSharingGroupId, - Throwable cause); + default void cancelSlotRequest( Review comment: Looks to me default body is not needed. I tried removing the default body and flink-runtime still compiles. Correct me if I miss anything. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org