This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e890caf89ef [FLINK-39016][Runtime/REST] Add configurable TTL for 
ExecutionGraph cache independent of web refresh interval
e890caf89ef is described below

commit e890caf89ef8ef175789e0b5896f5dd030445ad3
Author: Myracle <[email protected]>
AuthorDate: Tue Feb 3 15:41:15 2026 +0800

    [FLINK-39016][Runtime/REST] Add configurable TTL for ExecutionGraph cache 
independent of web refresh interval
---
 .../shortcodes/generated/expert_rest_section.html  |  6 +++
 .../shortcodes/generated/rest_configuration.html   |  6 +++
 .../apache/flink/configuration/RestOptions.java    | 22 +++++++++++
 .../flink/runtime/rest/RestEndpointFactory.java    |  4 +-
 .../rest/handler/RestHandlerConfiguration.java     | 19 +++++++++
 .../rest/handler/RestHandlerConfigurationTest.java | 46 ++++++++++++++++++++++
 6 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/expert_rest_section.html 
b/docs/layouts/shortcodes/generated/expert_rest_section.html
index a74404bf75a..d50b725be12 100644
--- a/docs/layouts/shortcodes/generated/expert_rest_section.html
+++ b/docs/layouts/shortcodes/generated/expert_rest_section.html
@@ -32,6 +32,12 @@
             <td>Duration</td>
             <td>Duration from write after which cached checkpoints statistics 
are cleaned up. For backwards compatibility, if no value is configured, <code 
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
         </tr>
+        <tr>
+            <td><h5>rest.cache.execution-graph.timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Time-to-live for cached ExecutionGraph. If not set, defaults 
to the value of <code class="highlighter-rouge">web.refresh-interval</code>. 
Setting this to 0 means the cache will always fetch fresh data, which is useful 
for real-time state synchronization scenarios.</td>
+        </tr>
         <tr>
             <td><h5>rest.client.max-content-length</h5></td>
             <td style="word-wrap: break-word;">104857600</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html 
b/docs/layouts/shortcodes/generated/rest_configuration.html
index 36a543dbbe7..3248b8d895b 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -50,6 +50,12 @@
             <td>Duration</td>
             <td>Duration from write after which cached checkpoints statistics 
are cleaned up. For backwards compatibility, if no value is configured, <code 
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
         </tr>
+        <tr>
+            <td><h5>rest.cache.execution-graph.timeout</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Time-to-live for cached ExecutionGraph. If not set, defaults 
to the value of <code class="highlighter-rouge">web.refresh-interval</code>. 
Setting this to 0 means the cache will always fetch fresh data, which is useful 
for real-time state synchronization scenarios.</td>
+        </tr>
         <tr>
             <td><h5>rest.client.max-content-length</h5></td>
             <td style="word-wrap: break-word;">104857600</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 5cda9c3ddbf..667cbda31b4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -224,6 +224,28 @@ public class RestOptions {
                     .withDescription(
                             "Maximum number of entries in the checkpoint 
statistics cache.");
 
+    /**
+     * Time-to-live for cached ExecutionGraph. If not set, defaults to the 
value of {@link
+     * WebOptions#REFRESH_INTERVAL}.
+     *
+     * <p>Setting this to 0 means the cache will always fetch fresh data, 
which is useful for
+     * real-time state synchronization scenarios where stale state information 
should be avoided.
+     */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Duration> CACHE_EXECUTION_GRAPH_TIMEOUT =
+            key("rest.cache.execution-graph.timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time-to-live for cached 
ExecutionGraph. "
+                                                    + "If not set, defaults to 
the value of %s. "
+                                                    + "Setting this to 0 means 
the cache will always fetch fresh data, "
+                                                    + "which is useful for 
real-time state synchronization scenarios.",
+                                            
code(WebOptions.REFRESH_INTERVAL.key()))
+                                    .build());
+
     /** Enables the experimental flame graph feature. */
     @Documentation.Section(Documentation.Sections.EXPERT_REST)
     public static final ConfigOption<Boolean> ENABLE_FLAMEGRAPH =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 6b1bbb8c2c0..4c7ed22935d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 
-import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -56,7 +55,6 @@ public interface RestEndpointFactory<T extends 
RestfulGateway> {
     static ExecutionGraphCache createExecutionGraphCache(
             RestHandlerConfiguration restConfiguration) {
         return new DefaultExecutionGraphCache(
-                restConfiguration.getTimeout(),
-                Duration.ofMillis(restConfiguration.getRefreshInterval()));
+                restConfiguration.getTimeout(), 
restConfiguration.getExecutionGraphCacheTTL());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 0f3baf82b5a..8e7590947d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -32,6 +32,8 @@ public class RestHandlerConfiguration {
 
     private final long refreshInterval;
 
+    private final Duration executionGraphCacheTTL;
+
     private final int checkpointHistorySize;
 
     private final Duration checkpointCacheExpireAfterWrite;
@@ -50,6 +52,7 @@ public class RestHandlerConfiguration {
 
     public RestHandlerConfiguration(
             long refreshInterval,
+            Duration executionGraphCacheTTL,
             int checkpointHistorySize,
             Duration checkpointCacheExpireAfterWrite,
             int checkpointCacheSize,
@@ -62,6 +65,11 @@ public class RestHandlerConfiguration {
                 refreshInterval > 0L, "The refresh interval (ms) should be 
larger than 0.");
         this.refreshInterval = refreshInterval;
 
+        this.executionGraphCacheTTL = 
Preconditions.checkNotNull(executionGraphCacheTTL);
+        Preconditions.checkArgument(
+                !executionGraphCacheTTL.isNegative(),
+                "ExecutionGraph cache TTL should not be negative.");
+
         this.checkpointHistorySize = checkpointHistorySize;
         this.checkpointCacheExpireAfterWrite = checkpointCacheExpireAfterWrite;
         this.checkpointCacheSize = checkpointCacheSize;
@@ -77,6 +85,10 @@ public class RestHandlerConfiguration {
         return refreshInterval;
     }
 
+    public Duration getExecutionGraphCacheTTL() {
+        return executionGraphCacheTTL;
+    }
+
     public int getCheckpointHistorySize() {
         return checkpointHistorySize;
     }
@@ -112,6 +124,12 @@ public class RestHandlerConfiguration {
     public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
         final long refreshInterval = 
configuration.get(WebOptions.REFRESH_INTERVAL).toMillis();
 
+        // If CACHE_EXECUTION_GRAPH_TIMEOUT is not set, fall back to 
REFRESH_INTERVAL
+        final Duration executionGraphCacheTTL =
+                configuration
+                        .getOptional(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT)
+                        .orElse(Duration.ofMillis(refreshInterval));
+
         final int checkpointHistorySize = 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE);
         final Duration checkpointStatsSnapshotCacheExpireAfterWrite =
                 
configuration.get(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT);
@@ -133,6 +151,7 @@ public class RestHandlerConfiguration {
 
         return new RestHandlerConfiguration(
                 refreshInterval,
+                executionGraphCacheTTL,
                 checkpointHistorySize,
                 checkpointStatsSnapshotCacheExpireAfterWrite,
                 checkpointStatsSnapshotCacheSize,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index caf7c70055d..b7fac5d6d89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.CsvSource;
 import java.time.Duration;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link RestHandlerConfiguration}. */
 class RestHandlerConfigurationTest {
@@ -142,4 +143,49 @@ class RestHandlerConfigurationTest {
                 RestHandlerConfiguration.fromConfiguration(config);
         
assertThat(restHandlerConfiguration.getCheckpointCacheSize()).isEqualTo(testCacheSize);
     }
+
+    /** Tests that ExecutionGraph cache TTL defaults to refresh interval when 
not set. */
+    @Test
+    void testExecutionGraphCacheTTLDefault() {
+        final Duration refreshInterval = Duration.ofMillis(5000L);
+        final Configuration config = new Configuration();
+        config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+
+        // When CACHE_EXECUTION_GRAPH_TIMEOUT is not set, it should default to 
REFRESH_INTERVAL
+        
assertThat(restHandlerConfiguration.getExecutionGraphCacheTTL()).isEqualTo(refreshInterval);
+    }
+
+    /** Tests that ExecutionGraph cache TTL can be configured independently, 
including zero. */
+    @ParameterizedTest
+    @CsvSource({"10000", "0"})
+    void testExecutionGraphCacheTTLCustomValue(long cacheTTLMillis) {
+        final Duration refreshInterval = Duration.ofMillis(5000L);
+        final Duration cacheTTL = Duration.ofMillis(cacheTTLMillis);
+        final Configuration config = new Configuration();
+        config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+        config.set(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT, cacheTTL);
+
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+
+        // CACHE_EXECUTION_GRAPH_TIMEOUT should be independent of 
REFRESH_INTERVAL
+        assertThat(restHandlerConfiguration.getRefreshInterval())
+                .isEqualTo(refreshInterval.toMillis());
+        
assertThat(restHandlerConfiguration.getExecutionGraphCacheTTL()).isEqualTo(cacheTTL);
+    }
+
+    /** Tests that negative ExecutionGraph cache TTL throws 
IllegalArgumentException. */
+    @Test
+    void testExecutionGraphCacheTTLNegativeValue() {
+        final Duration negativeCacheTTL = Duration.ofMillis(-1000L);
+        final Configuration config = new Configuration();
+        config.set(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT, 
negativeCacheTTL);
+
+        assertThatThrownBy(() -> 
RestHandlerConfiguration.fromConfiguration(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("ExecutionGraph cache TTL should not be 
negative");
+    }
 }

Reply via email to