ferenc-csaky commented on code in PR #27539:
URL: https://github.com/apache/flink/pull/27539#discussion_r3006204146


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1133,6 +1154,27 @@ public void 
updateJobResourceRequirements(JobResourceRequirements jobResourceReq
         }
     }
 
+    private void recordRescaleForNewResourceRequirements() {
+        rescaleTimeline.updateRescale(
+                rescale ->
+                        rescale.addSchedulerState(state)
+                                
.setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
+                                .setEndTimestamp(Instant.now().toEpochMilli())
+                                .log());
+        rescaleTimeline.newRescale(true);
+        rescaleTimeline.updateRescale(
+                rescale ->
+                        rescale.setStartTimestamp(Instant.now().toEpochMilli())

Review Comment:
   I think it would worth adding a `setStartToNow()` and `setEndToNow()` that 
calls the setter for `Instant.now().toEpochMilli()` and use those when we can, 
cause most of the time this is what happens so we could shorten it a bit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1336,13 +1389,55 @@ public void goToRestarting(
         }
     }
 
+    private void recordRescaleForJobRestarting(VertexParallelism 
restartWithParallelism) {
+        if (restartWithParallelism == null) {

Review Comment:
   I'd probably invert this `if`, do the normal rescaling first, then return 
explicitly, so we can spare the `else` branch and the nested `if`, and 1 
indentation level.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -149,19 +175,31 @@ public ScheduledFuture<?> scheduleOperation(Runnable 
callback, Duration delay) {
 
     @Override
     public void transitionToSubsequentState() {
+        Optional<VertexParallelism> availableVertexParallelism =
+                context.getAvailableVertexParallelism();
+        if (!availableVertexParallelism.isPresent()) {

Review Comment:
   Use `availableVertexParallelism.isEmpty()` instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -49,6 +59,10 @@ default void close() {}
      */
     interface Context {
 
+        State schedulerState();
+
+        RescaleTimeline getRescaleTimeline();

Review Comment:
   I think I'd rather add a common interface (e.g. `HasRescaleTimeline`) that 
has `getRescaleTimeline` and then extend the `Context` with that interface here 
and in `StateWithExecutionGraph`, `StateWithoutExecutionGraph` as well, so we 
don't duplicate this method 3 times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to