ferenc-csaky commented on code in PR #27539:
URL: https://github.com/apache/flink/pull/27539#discussion_r3006204146
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1133,6 +1154,27 @@ public void
updateJobResourceRequirements(JobResourceRequirements jobResourceReq
}
}
+ private void recordRescaleForNewResourceRequirements() {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+
.setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .log());
+ rescaleTimeline.newRescale(true);
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setStartTimestamp(Instant.now().toEpochMilli())
Review Comment:
I think it would worth adding a `setStartToNow()` and `setEndToNow()` that
calls the setter for `Instant.now().toEpochMilli()` and use those when we can,
cause most of the time this is what happens so we could shorten it a bit.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1336,13 +1389,55 @@ public void goToRestarting(
}
}
+ private void recordRescaleForJobRestarting(VertexParallelism
restartWithParallelism) {
+ if (restartWithParallelism == null) {
Review Comment:
I'd probably invert this `if`, do the normal rescaling first, then return
explicitly, so we can spare the `else` branch and the nested `if`, and 1
indentation level.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -149,19 +175,31 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
@Override
public void transitionToSubsequentState() {
+ Optional<VertexParallelism> availableVertexParallelism =
+ context.getAvailableVertexParallelism();
+ if (!availableVertexParallelism.isPresent()) {
Review Comment:
Use `availableVertexParallelism.isEmpty()` instead.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -49,6 +59,10 @@ default void close() {}
*/
interface Context {
+ State schedulerState();
+
+ RescaleTimeline getRescaleTimeline();
Review Comment:
I think I'd rather add a common interface (e.g. `HasRescaleTimeline`) that
has `getRescaleTimeline` and then extend the `Context` with that interface here
and in `StateWithExecutionGraph`, `StateWithoutExecutionGraph` as well, so we
don't duplicate this method 3 times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]