This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 86a0d39 [FLINK-27303] Improve config cache settings + add cleanup 86a0d39 is described below commit 86a0d396866cbd0999b599a1e2088ca765e81bae Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue May 3 14:13:06 2022 +0200 [FLINK-27303] Improve config cache settings + add cleanup --- .../kubernetes_operator_config_configuration.html | 12 ++++++++++ .../operator/config/FlinkConfigManager.java | 27 ++++++++++++++-------- .../config/KubernetesOperatorConfigOptions.java | 12 ++++++++++ 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 61bef00..678a649 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -8,6 +8,18 @@ </tr> </thead> <tbody> + <tr> + <td><h5>kubernetes.operator.config.cache.size</h5></td> + <td style="word-wrap: break-word;">1000</td> + <td>Integer</td> + <td>Max config cache size.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.config.cache.timeout</h5></td> + <td style="word-wrap: break-word;">10 min</td> + <td>Duration</td> + <td>Expiration time for cached configs.</td> + </tr> <tr> <td><h5>kubernetes.operator.deployment.readiness.timeout</h5></td> <td style="word-wrap: break-word;">1 min</td> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index da97fef..dfcfc9e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,9 +56,6 @@ public class FlinkConfigManager { private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class); private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final int MAX_CACHE_SIZE = 1000; - private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30); - private volatile Configuration defaultConfig; private volatile FlinkOperatorConfiguration operatorConfiguration; private final AtomicLong defaultConfigVersion = new AtomicLong(0); @@ -70,10 +68,14 @@ public class FlinkConfigManager { } public FlinkConfigManager(Configuration defaultConfig) { + Duration cacheTimeout = + defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT); this.cache = CacheBuilder.newBuilder() - .maximumSize(MAX_CACHE_SIZE) - .expireAfterAccess(CACHE_TIMEOUT) + .maximumSize( + defaultConfig.get( + KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE)) + .expireAfterAccess(cacheTimeout) .removalListener( removalNotification -> FlinkConfigBuilder.cleanupTmpFiles( @@ -87,8 +89,15 @@ public class FlinkConfigManager { }); updateDefaultConfig(defaultConfig); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleWithFixedDelay( + cache::cleanUp, + cacheTimeout.toMillis(), + cacheTimeout.toMillis(), + TimeUnit.MILLISECONDS); + if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) { - scheduleConfigWatcher(); + scheduleConfigWatcher(executorService); } } @@ -151,11 +160,11 @@ public class FlinkConfigManager { } } - private void scheduleConfigWatcher() { + private void scheduleConfigWatcher(ScheduledExecutorService executorService) { var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL); var millis = checkInterval.toMillis(); - Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate(new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS); + executorService.scheduleAtFixedRate( + new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS); LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index cc6ef7e..cc5e883 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -122,4 +122,16 @@ public class KubernetesOperatorConfigOptions { .durationType() .defaultValue(Duration.ofMinutes(5)) .withDescription("Time interval for checking config changes."); + + public static final ConfigOption<Duration> OPERATOR_CONFIG_CACHE_TIMEOUT = + ConfigOptions.key("kubernetes.operator.config.cache.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription("Expiration time for cached configs."); + + public static final ConfigOption<Integer> OPERATOR_CONFIG_CACHE_SIZE = + ConfigOptions.key("kubernetes.operator.config.cache.size") + .intType() + .defaultValue(1000) + .withDescription("Max config cache size."); }