Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gyfora merged PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1570203474 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -129,8 +133,10 @@ public boolean scaleResource( scalingSummaries, autoScalerEventHandler); -if (scalingWouldExceedClusterResources( -configOverrides.newConfigWithOverrides(conf), +var memoryTuningEnabled = conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED); +if (scalingWouldExceedMaxResources( +memoryTuningEnabled ? configOverrides.newConfigWithOverrides(conf) : conf, Review Comment: Forgotten, but now I've added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gyfora commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1565696510 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -129,8 +133,10 @@ public boolean scaleResource( scalingSummaries, autoScalerEventHandler); -if (scalingWouldExceedClusterResources( -configOverrides.newConfigWithOverrides(conf), +var memoryTuningEnabled = conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED); +if (scalingWouldExceedMaxResources( +memoryTuningEnabled ? configOverrides.newConfigWithOverrides(conf) : conf, Review Comment: Did you add a test for this @gaborgsomogyi ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562336481 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -129,8 +133,10 @@ public boolean scaleResource( scalingSummaries, autoScalerEventHandler); -if (scalingWouldExceedClusterResources( -configOverrides.newConfigWithOverrides(conf), +var memoryTuningEnabled = conf.get(AutoScalerOptions.MEMORY_TUNING_ENABLED); +if (scalingWouldExceedMaxResources( +memoryTuningEnabled ? configOverrides.newConfigWithOverrides(conf) : conf, Review Comment: @mxm this is basically a bugfix. The original code assumed that memory tuning is always enabled which is not true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#issuecomment-2051448683 @mxm can you plz have a second look? We've talked it through with @gyfora and the changes are reflecting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562302534 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( Review Comment: I've made a deep-dive here and the conclusion is to have a single function where both called. They seem to do similar things but the fact is that the commonality is more or less the following: ``` var tmCpu = ctx.getTaskManagerCpu().orElse(0.); var tmMemory = MemoryTuning.getTotalMemory(tunedConfig, ctx); var numSlotsPerTm = tunedConfig.get(TaskManagerOptions.NUM_TASK_SLOTS); ``` All in all I've wrapped them together but in a separate function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562195810 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -115,6 +119,17 @@ public boolean scaleResource( return false; } +if (resourceQuotaReached(conf, evaluatedMetrics, scalingSummaries, context)) { +autoScalerEventHandler.handleEvent( +context, +AutoScalerEventHandler.Type.Warning, +"ResourceQuotaReached", +RESOURCE_QUOTA_REACHED_MESSAGE, +null, +conf.get(SCALING_EVENT_INTERVAL)); +return false; +} Review Comment: Makes sense, moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562107810 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) { return get(jobVertexID).getInputs().isEmpty(); } -public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) { -get(vertexID).updateMaxParallelism(maxParallelism); Review Comment: This was an accidental remove, added back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1562107436 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopol "Updating source {} max parallelism based on available partitions to {}", sourceVertex, numPartitions); -topology.updateMaxParallelism(sourceVertex, (int) numPartitions); +topology.get(sourceVertex).setMaxParallelism((int) numPartitions); Review Comment: Nice catch, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560631310 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( +Configuration conf, +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +JobAutoScalerContext ctx) { + +if (evaluatedMetrics.getJobTopology() == null +|| evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { +return false; +} + +var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); +var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); +var tmMemory = ctx.getTaskManagerMemory(); +var tmCpu = ctx.getTaskManagerCpu(); + +if (cpuQuota.isPresent() || memoryQuota.isPresent()) { +var currentSlotSharingGroupMaxParallelisms = new HashMap(); +var newSlotSharingGroupMaxParallelisms = new HashMap(); +for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: Such case `jobTopology.getSlotSharingGroupMapping().isEmpty()`so it's handled: https://github.com/apache/flink-kubernetes-operator/pull/789/files#diff-7fdb929157b6a94cc180c67b8dddb6722d26a7b8ea8259eb7cb9ef84b9a418a3R224-R227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560668425 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java: ## @@ -30,6 +31,7 @@ @NoArgsConstructor @AllArgsConstructor public class EvaluatedMetrics { +private JobTopology jobTopology; Review Comment: Nice catch, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34574] Add CPU and memory size autoscaler quota [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1560631310 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } +protected static boolean resourceQuotaReached( +Configuration conf, +EvaluatedMetrics evaluatedMetrics, +Map scalingSummaries, +JobAutoScalerContext ctx) { + +if (evaluatedMetrics.getJobTopology() == null +|| evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { +return false; +} + +var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); +var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); +var tmMemory = ctx.getTaskManagerMemory(); +var tmCpu = ctx.getTaskManagerCpu(); + +if (cpuQuota.isPresent() || memoryQuota.isPresent()) { +var currentSlotSharingGroupMaxParallelisms = new HashMap(); +var newSlotSharingGroupMaxParallelisms = new HashMap(); +for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: Such case Job topology is null so it's handled: https://github.com/apache/flink-kubernetes-operator/pull/789/files#diff-7fdb929157b6a94cc180c67b8dddb6722d26a7b8ea8259eb7cb9ef84b9a418a3R224-R227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org