rkhachatryan commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1136373675


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -123,9 +123,18 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
-        if (context.canScaleUp(getExecutionGraph())) {
-            getLogger().info("New resources are available. Restarting job to 
scale up.");
+    public void onNewResourcesAvailable() {
+        maybeRescale();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
+        maybeRescale();
+    }

Review Comment:
   1. Should there be a timeout to eventually satisfy the requirements? 
(similar to one in `WaitingForResources`)
   I guess in `onNewResourcesAvailable()`, new resources can be ignored 
indefinitely, but we can't afford that for requirements?
   
   2. Is `maybeRescale()` indeed suitable for new requirements?
   In case when some vertices were up-scaled and some downscaled, the 
cumulative parallelism won't change (that's what is eventually checked in 
`AdaptiveScheduler.shouldRescale` from `maybeRescale`).
   Or am I missing something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingController.java:
##########
@@ -25,16 +25,17 @@
  * Simple scaling policy for a reactive mode. The user can configure a minimum 
cumulative
  * parallelism increase to allow a scale up.
  */
-public class ReactiveScaleUpController implements ScaleUpController {
+public class EnforceMinimalIncreaseRescalingController implements 
RescalingController {
 
     private final int minParallelismIncrease;
 
-    public ReactiveScaleUpController(Configuration configuration) {
+    public EnforceMinimalIncreaseRescalingController(Configuration 
configuration) {
         minParallelismIncrease = configuration.get(MIN_PARALLELISM_INCREASE);
     }
 
     @Override
-    public boolean canScaleUp(int currentCumulativeParallelism, int 
newCumulativeParallelism) {
-        return newCumulativeParallelism - currentCumulativeParallelism >= 
minParallelismIncrease;
+    public boolean shouldRescale(int currentCumulativeParallelism, int 
newCumulativeParallelism) {
+        return Math.abs(newCumulativeParallelism - 
currentCumulativeParallelism)
+                >= minParallelismIncrease;

Review Comment:
   1. nit: `minParallelismIncrease` => `minParallelismChange`
   2. Or maybe it's better to have both minIncrease and minDecrease? At least 
no need to **rename** the config option :slightly_smiling_face: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());

Review Comment:
   Here, the lower bound is always `1`, and `SlotAllocator` also ignores it.
   
   Should we throw `UnsupportedOperationException` if the requested lower bound 
is higher?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements 
jobResourceRequirements) {

Review Comment:
   I'm missing storing these new requirements (if they changed).
   Is that done somewhere else?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java:
##########
@@ -144,12 +135,17 @@ public Logger getLogger() {
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
+    public void onNewResourcesAvailable() {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
         checkDesiredOrSufficientResourcesAvailable();
     }
 
     private void checkDesiredOrSufficientResourcesAvailable() {
-        if (context.hasDesiredResources(desiredResources)) {
+        if (context.hasDesiredResources()) {

Review Comment:
   not directly related to this PR:
   when `resourceStabilizationDeadline` is not null, should we skip scheduling 
`checkDesiredOrSufficientResourcesAvailable` (on line 162)?
   Otherwise, we schedule as many checks as there are changes in resources.
   Or am I missing something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -216,11 +218,16 @@
     private final DeploymentStateTimeMetrics deploymentTimeMetrics;
 
     private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
+    private JobGraphJobInformation jobInformation;
+    private ResourceCounter desiredResources = ResourceCounter.empty();
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
+    private final Duration slotIdleTimeout;
+

Review Comment:
   It's actually a check interval, not the timeout, right?
   Should we then call it `slotIdlenessCheckInterval`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java:
##########
@@ -185,4 +187,28 @@ void deliverOperatorEventToCoordinator(
      */
     CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
             OperatorID operator, CoordinationRequest request) throws 
FlinkException;
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements}.
+     *
+     * @return Current resource requirements.
+     */
+    default JobResourceRequirements requestJobResourceRequirements() {
+        throw new UnsupportedOperationException(

Review Comment:
   IIRC, there were plans to implement `GET` for the default scheduler (please 
correct me if I'm wrong).
   If it's out of scope of this PR, should we create a ticket and add a `TODO` 
here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements 
jobResourceRequirements) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            throw new UnsupportedOperationException(
+                    "Cannot change the parallelism of a job running in 
reactive mode.");
+        }
+        final Optional<VertexParallelismStore> 
maybeUpdateVertexParallelismStore =
+                DefaultVertexParallelismStore.applyJobResourceRequirements(
+                        jobInformation.getVertexParallelismStore(), 
jobResourceRequirements);
+        if (maybeUpdateVertexParallelismStore.isPresent()) {
+            this.jobInformation =
+                    new JobGraphJobInformation(jobGraph, 
maybeUpdateVertexParallelismStore.get());
+            declareDesiredResources();
+            state.tryRun(
+                    ResourceListener.class,
+                    ResourceListener::onNewResourceRequirements,
+                    "Current state does not react to desired parallelism 
changes.");

Review Comment:
   Just to double-check:
   if the cumulative DoP doesn't change,
   - nothing is (re)declared in `declareDesiredResources()` - that's fine (?)
   - but we still want to proceed to `onNewResourceRequirements` check (?)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1213,4 +1268,17 @@ <T extends State> T transitionToState(StateFactory<T> 
targetState) {
     State getState() {
         return state;
     }
+
+    /**
+     * Check for slots that are idle for more than {@link 
JobManagerOptions#SLOT_IDLE_TIMEOUT} and
+     * release them back to the ResourceManager.
+     */
+    private void checkIdleSlotTimeout() {
+        declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());
+        getMainThreadExecutor()
+                .schedule(
+                        this::checkIdleSlotTimeout,

Review Comment:
   1. Could you explain why this method is necessary -don't we release the 
excessive slots in `CreatingExecutionGraph`? What is the scenario?
   
   2.  Should the scheduled task be cancelled, e.g. in case of losing 
leadership? Otherwise, won't we get as many scheduled tasks as leadership 
changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to