[PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-15 Thread via GitHub


afedulov opened a new pull request, #735:
URL: https://github.com/apache/flink-kubernetes-operator/pull/735

   This PR contains the following improvements to the restart tracking logic:
   * Adds more debug logs
   * Stores restart Duration directly instead of the endTime Instant
   * Fixes a bug that makes restart duration tracking dependent on whether 
metrics are considered fully collected


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


mxm commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1429815765


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   We don't need the RUNNING job state check. This block can be reduced to:
   
   ```java
 if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
   ```
   
   The reason is that this method only gets called when the job is in running 
state (see line 99). Enforcing a RUNNING state has always been a precondition 
for executing autoscaling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


mxm commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1429815765


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   We don't need the RUNNING job state check. This block can be reduced to:
   
   ```java
 if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
   ```
   
   The reason is that this method only gets called when the job is in running 
state (see line 99). Enforcing a RUNNING state has always been a precondition 
for executing the autoscaling logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


mxm commented on PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#issuecomment-1859964401

   This needs a rebase. I'll run the tests afterwards.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-19 Thread via GitHub


gyfora commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1431637325


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   can you please extract this logic into a method to keep the flow simpler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-19 Thread via GitHub


gyfora commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1431638371


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   It could be part of `stateStore.storeScalingTracking(ctx, scalingTracking);` 
even



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2023-12-19 Thread via GitHub


mxm commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1431686776


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   I was going to recommend this as well. But these are only three lines of 
code (after removing the unneeded RUNNING condition) and the resulting method 
signature would be quite big. I don't think we need to block the PR on this 
refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2024-01-09 Thread via GitHub


afedulov commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1446480705


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); 
even
I removed the redundant RUNNING check, as Max recommended, so it looks more 
straightforward now. Pushing this call down into the `storeScalingTracking` 
would make it harder to reason, since it is key that `runRescaleLogic` is only 
executed when the job is in the RUNNING state and hence the transition is 
considered complete. It also does not seem right to bundle the logic specific 
to this concrete situation into `KubernetesAutoScalerStateStore` which acts 
more as a simple persistence layer. Hope this is fine by you.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   Good catch, thanks.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 jobTopology.getVerticesInTopologicalOrder(),
 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-if (!collectedMetrics.isFullyCollected()) {
-// We have done an upfront evaluation, but we are not ready for 
scaling.
-resetRecommendedParallelism(evaluatedMetrics);
-return;
-}
-
 var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
 // A scaling tracking without an end time gets created whenever a 
scaling decision is
 // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
 if (ctx.getJobStatus() == JobStatus.RUNNING) {
-if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
 now, jobTopology, scalingHistory)) {
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 }

Review Comment:
   > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); 
even
   
I removed the redundant RUNNING check, as Max recommended, so it looks more 
straightforward now. Pushing this call down into the `storeScalingTracking` 
would make it harder to reason, since it is key that `runRescaleLogic` is only 
executed when the job is in the RUNNING state and hence the transition is 
considered complete. It also does not seem right to bundle the logic specific 
to this concrete situation into `KubernetesAu

Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2024-01-09 Thread via GitHub


afedulov commented on PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#issuecomment-1883606900

   @mxm I addressed the comments and rebased, could you please kick off the 
tests again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2024-01-09 Thread via GitHub


mxm commented on PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#issuecomment-1883609244

   Sure, they should be running now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2024-01-10 Thread via GitHub


mxm merged PR #735:
URL: https://github.com/apache/flink-kubernetes-operator/pull/735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30593][autoscaler] Improve restart time tracking [flink-kubernetes-operator]

2024-01-10 Thread via GitHub


afedulov commented on PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#issuecomment-1884713935

   @mxm @gyfora 
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org