1996fanrui commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1482361348


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -250,6 +251,31 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+    public static final ConfigOption<Boolean> MEMORY_TUNING_ENABLED =
+            autoScalerConfig("memory.tuning.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+                    .withDescription(
+                            "If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+    public static final ConfigOption<MemorySize> MEMORY_TUNING_MIN_HEAP =
+            autoScalerConfig("memory.tuning.heap.min")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(512L))
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+                    .withDescription(
+                            "The minimum amount of TaskManager memory, if 
memory tuning is enabled.");
+
+    public static final ConfigOption<Boolean> 
MEMORY_TUNING_TRANSFER_HEAP_TO_MANAGED =
+            autoScalerConfig("memory.tuning.heap.transfer-to-managed")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withFallbackKeys(
+                            
oldOperatorConfigKey("memory.tuning.heap.transfer-to-managed"))
+                    .withDescription(
+                            "If enabled, any reduction of heap memory will 
increase the managed memory for RocksDB. RocksDB needs to be enabled.");

Review Comment:
   ```suggestion
                               "If enabled, any reduction of heap memory will 
increase the managed memory for RocksDB when rocksdb state backend is used.");
   ```
   
   How about this?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##########
@@ -52,19 +55,25 @@ public class JobAutoScalerContext<KEY> {
 
     @Nullable @Getter private final JobStatus jobStatus;
 
+    /** The configuration derived from the current spec. */

Review Comment:
   ```suggestion
       /** The configuration derived from the user-specified spec instead of 
actual spec. */
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java:
##########
@@ -32,7 +32,7 @@
 class ResourceCheckUtilsTest {
 
     @Test
-    void testEstimateNumTaskSlotsAfterRescale() {
+    void estimateNumTaskSlotsAfterRescale() {

Review Comment:
   Why remove the test prefix? In general, the prefix of all test methods are 
`test`.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -65,7 +65,7 @@ public KubernetesJobAutoScalerContext 
getJobAutoScalerContext() {
     }
 
     private KubernetesJobAutoScalerContext createJobAutoScalerContext() {
-        Configuration conf = new Configuration(getObserveConfig());
+        Configuration conf = new 
Configuration(getDeployConfig(resource.getSpec()));

Review Comment:
   Using `getDeployConfig` here makes sense to me. And I left a comment about 
the code comment of `JobAutoScalerContext#configuration`.
   
   It's better to remind others it's user-specified spec instead of actual spec.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java:
##########
@@ -43,6 +53,34 @@ public void realize(
                         getOverrideString(context, parallelismOverrides));
     }
 
+    @Override
+    public void realizeConfigOverrides(
+            KubernetesJobAutoScalerContext context, Configuration 
configOverrides) {
+        if (!(context.getResource() instanceof FlinkDeployment)) {
+            // We can't adjust the configuration of non-job deployments.
+            return;
+        }
+        FlinkDeployment flinkDeployment = ((FlinkDeployment) 
context.getResource());
+        // Apply config overrides
+        
flinkDeployment.getSpec().getFlinkConfiguration().putAll(configOverrides.toMap());
+
+        // Update total memory in spec
+        var totalMemoryOverride = 
MemoryTuningUtils.getTotalMemory(configOverrides, context);
+        if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Memory override {} is not valid", totalMemoryOverride);

Review Comment:
   nit:
   ```suggestion
               LOG.warn("Total memory override {} is not valid", 
totalMemoryOverride);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to