This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 86a0d39  [FLINK-27303] Improve config cache settings + add cleanup
86a0d39 is described below

commit 86a0d396866cbd0999b599a1e2088ca765e81bae
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue May 3 14:13:06 2022 +0200

    [FLINK-27303] Improve config cache settings + add cleanup
---
 .../kubernetes_operator_config_configuration.html  | 12 ++++++++++
 .../operator/config/FlinkConfigManager.java        | 27 ++++++++++++++--------
 .../config/KubernetesOperatorConfigOptions.java    | 12 ++++++++++
 3 files changed, 42 insertions(+), 9 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 61bef00..678a649 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>kubernetes.operator.config.cache.size</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>Max config cache size.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.config.cache.timeout</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>Expiration time for cached configs.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.deployment.readiness.timeout</h5></td>
             <td style="word-wrap: break-word;">1 min</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index da97fef..dfcfc9e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -55,9 +56,6 @@ public class FlinkConfigManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkConfigManager.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    private static final int MAX_CACHE_SIZE = 1000;
-    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
-
     private volatile Configuration defaultConfig;
     private volatile FlinkOperatorConfiguration operatorConfiguration;
     private final AtomicLong defaultConfigVersion = new AtomicLong(0);
@@ -70,10 +68,14 @@ public class FlinkConfigManager {
     }
 
     public FlinkConfigManager(Configuration defaultConfig) {
+        Duration cacheTimeout =
+                
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
         this.cache =
                 CacheBuilder.newBuilder()
-                        .maximumSize(MAX_CACHE_SIZE)
-                        .expireAfterAccess(CACHE_TIMEOUT)
+                        .maximumSize(
+                                defaultConfig.get(
+                                        
KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE))
+                        .expireAfterAccess(cacheTimeout)
                         .removalListener(
                                 removalNotification ->
                                         FlinkConfigBuilder.cleanupTmpFiles(
@@ -87,8 +89,15 @@ public class FlinkConfigManager {
                                 });
 
         updateDefaultConfig(defaultConfig);
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleWithFixedDelay(
+                cache::cleanUp,
+                cacheTimeout.toMillis(),
+                cacheTimeout.toMillis(),
+                TimeUnit.MILLISECONDS);
+
         if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
-            scheduleConfigWatcher();
+            scheduleConfigWatcher(executorService);
         }
     }
 
@@ -151,11 +160,11 @@ public class FlinkConfigManager {
         }
     }
 
-    private void scheduleConfigWatcher() {
+    private void scheduleConfigWatcher(ScheduledExecutorService 
executorService) {
         var checkInterval = 
defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
         var millis = checkInterval.toMillis();
-        Executors.newSingleThreadScheduledExecutor()
-                .scheduleAtFixedRate(new ConfigUpdater(), millis, millis, 
TimeUnit.MILLISECONDS);
+        executorService.scheduleAtFixedRate(
+                new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS);
         LOG.info("Enabled dynamic config updates, checking config changes 
every {}", checkInterval);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index cc6ef7e..cc5e883 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -122,4 +122,16 @@ public class KubernetesOperatorConfigOptions {
                     .durationType()
                     .defaultValue(Duration.ofMinutes(5))
                     .withDescription("Time interval for checking config 
changes.");
+
+    public static final ConfigOption<Duration> OPERATOR_CONFIG_CACHE_TIMEOUT =
+            ConfigOptions.key("kubernetes.operator.config.cache.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription("Expiration time for cached configs.");
+
+    public static final ConfigOption<Integer> OPERATOR_CONFIG_CACHE_SIZE =
+            ConfigOptions.key("kubernetes.operator.config.cache.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Max config cache size.");
 }

Reply via email to