This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e890caf89ef [FLINK-39016][Runtime/REST] Add configurable TTL for
ExecutionGraph cache independent of web refresh interval
e890caf89ef is described below
commit e890caf89ef8ef175789e0b5896f5dd030445ad3
Author: Myracle <[email protected]>
AuthorDate: Tue Feb 3 15:41:15 2026 +0800
[FLINK-39016][Runtime/REST] Add configurable TTL for ExecutionGraph cache
independent of web refresh interval
---
.../shortcodes/generated/expert_rest_section.html | 6 +++
.../shortcodes/generated/rest_configuration.html | 6 +++
.../apache/flink/configuration/RestOptions.java | 22 +++++++++++
.../flink/runtime/rest/RestEndpointFactory.java | 4 +-
.../rest/handler/RestHandlerConfiguration.java | 19 +++++++++
.../rest/handler/RestHandlerConfigurationTest.java | 46 ++++++++++++++++++++++
6 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/expert_rest_section.html
b/docs/layouts/shortcodes/generated/expert_rest_section.html
index a74404bf75a..d50b725be12 100644
--- a/docs/layouts/shortcodes/generated/expert_rest_section.html
+++ b/docs/layouts/shortcodes/generated/expert_rest_section.html
@@ -32,6 +32,12 @@
<td>Duration</td>
<td>Duration from write after which cached checkpoints statistics
are cleaned up. For backwards compatibility, if no value is configured, <code
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
</tr>
+ <tr>
+ <td><h5>rest.cache.execution-graph.timeout</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Time-to-live for cached ExecutionGraph. If not set, defaults
to the value of <code class="highlighter-rouge">web.refresh-interval</code>.
Setting this to 0 means the cache will always fetch fresh data, which is useful
for real-time state synchronization scenarios.</td>
+ </tr>
<tr>
<td><h5>rest.client.max-content-length</h5></td>
<td style="word-wrap: break-word;">104857600</td>
diff --git a/docs/layouts/shortcodes/generated/rest_configuration.html
b/docs/layouts/shortcodes/generated/rest_configuration.html
index 36a543dbbe7..3248b8d895b 100644
--- a/docs/layouts/shortcodes/generated/rest_configuration.html
+++ b/docs/layouts/shortcodes/generated/rest_configuration.html
@@ -50,6 +50,12 @@
<td>Duration</td>
<td>Duration from write after which cached checkpoints statistics
are cleaned up. For backwards compatibility, if no value is configured, <code
class="highlighter-rouge">web.refresh-interval</code> will be used instead.</td>
</tr>
+ <tr>
+ <td><h5>rest.cache.execution-graph.timeout</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Time-to-live for cached ExecutionGraph. If not set, defaults
to the value of <code class="highlighter-rouge">web.refresh-interval</code>.
Setting this to 0 means the cache will always fetch fresh data, which is useful
for real-time state synchronization scenarios.</td>
+ </tr>
<tr>
<td><h5>rest.client.max-content-length</h5></td>
<td style="word-wrap: break-word;">104857600</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 5cda9c3ddbf..667cbda31b4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -224,6 +224,28 @@ public class RestOptions {
.withDescription(
"Maximum number of entries in the checkpoint
statistics cache.");
+ /**
+ * Time-to-live for cached ExecutionGraph. If not set, defaults to the
value of {@link
+ * WebOptions#REFRESH_INTERVAL}.
+ *
+ * <p>Setting this to 0 means the cache will always fetch fresh data,
which is useful for
+ * real-time state synchronization scenarios where stale state information
should be avoided.
+ */
+ @Documentation.Section(Documentation.Sections.EXPERT_REST)
+ public static final ConfigOption<Duration> CACHE_EXECUTION_GRAPH_TIMEOUT =
+ key("rest.cache.execution-graph.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Time-to-live for cached
ExecutionGraph. "
+ + "If not set, defaults to
the value of %s. "
+ + "Setting this to 0 means
the cache will always fetch fresh data, "
+ + "which is useful for
real-time state synchronization scenarios.",
+
code(WebOptions.REFRESH_INTERVAL.key()))
+ .build());
+
/** Enables the experimental flame graph feature. */
@Documentation.Section(Documentation.Sections.EXPERT_REST)
public static final ConfigOption<Boolean> ENABLE_FLAMEGRAPH =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index 6b1bbb8c2c0..4c7ed22935d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -56,7 +55,6 @@ public interface RestEndpointFactory<T extends
RestfulGateway> {
static ExecutionGraphCache createExecutionGraphCache(
RestHandlerConfiguration restConfiguration) {
return new DefaultExecutionGraphCache(
- restConfiguration.getTimeout(),
- Duration.ofMillis(restConfiguration.getRefreshInterval()));
+ restConfiguration.getTimeout(),
restConfiguration.getExecutionGraphCacheTTL());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 0f3baf82b5a..8e7590947d0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -32,6 +32,8 @@ public class RestHandlerConfiguration {
private final long refreshInterval;
+ private final Duration executionGraphCacheTTL;
+
private final int checkpointHistorySize;
private final Duration checkpointCacheExpireAfterWrite;
@@ -50,6 +52,7 @@ public class RestHandlerConfiguration {
public RestHandlerConfiguration(
long refreshInterval,
+ Duration executionGraphCacheTTL,
int checkpointHistorySize,
Duration checkpointCacheExpireAfterWrite,
int checkpointCacheSize,
@@ -62,6 +65,11 @@ public class RestHandlerConfiguration {
refreshInterval > 0L, "The refresh interval (ms) should be
larger than 0.");
this.refreshInterval = refreshInterval;
+ this.executionGraphCacheTTL =
Preconditions.checkNotNull(executionGraphCacheTTL);
+ Preconditions.checkArgument(
+ !executionGraphCacheTTL.isNegative(),
+ "ExecutionGraph cache TTL should not be negative.");
+
this.checkpointHistorySize = checkpointHistorySize;
this.checkpointCacheExpireAfterWrite = checkpointCacheExpireAfterWrite;
this.checkpointCacheSize = checkpointCacheSize;
@@ -77,6 +85,10 @@ public class RestHandlerConfiguration {
return refreshInterval;
}
+ public Duration getExecutionGraphCacheTTL() {
+ return executionGraphCacheTTL;
+ }
+
public int getCheckpointHistorySize() {
return checkpointHistorySize;
}
@@ -112,6 +124,12 @@ public class RestHandlerConfiguration {
public static RestHandlerConfiguration fromConfiguration(Configuration
configuration) {
final long refreshInterval =
configuration.get(WebOptions.REFRESH_INTERVAL).toMillis();
+ // If CACHE_EXECUTION_GRAPH_TIMEOUT is not set, fall back to
REFRESH_INTERVAL
+ final Duration executionGraphCacheTTL =
+ configuration
+ .getOptional(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT)
+ .orElse(Duration.ofMillis(refreshInterval));
+
final int checkpointHistorySize =
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE);
final Duration checkpointStatsSnapshotCacheExpireAfterWrite =
configuration.get(RestOptions.CACHE_CHECKPOINT_STATISTICS_TIMEOUT);
@@ -133,6 +151,7 @@ public class RestHandlerConfiguration {
return new RestHandlerConfiguration(
refreshInterval,
+ executionGraphCacheTTL,
checkpointHistorySize,
checkpointStatsSnapshotCacheExpireAfterWrite,
checkpointStatsSnapshotCacheSize,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index caf7c70055d..b7fac5d6d89 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link RestHandlerConfiguration}. */
class RestHandlerConfigurationTest {
@@ -142,4 +143,49 @@ class RestHandlerConfigurationTest {
RestHandlerConfiguration.fromConfiguration(config);
assertThat(restHandlerConfiguration.getCheckpointCacheSize()).isEqualTo(testCacheSize);
}
+
+ /** Tests that ExecutionGraph cache TTL defaults to refresh interval when
not set. */
+ @Test
+ void testExecutionGraphCacheTTLDefault() {
+ final Duration refreshInterval = Duration.ofMillis(5000L);
+ final Configuration config = new Configuration();
+ config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
+ // When CACHE_EXECUTION_GRAPH_TIMEOUT is not set, it should default to
REFRESH_INTERVAL
+
assertThat(restHandlerConfiguration.getExecutionGraphCacheTTL()).isEqualTo(refreshInterval);
+ }
+
+ /** Tests that ExecutionGraph cache TTL can be configured independently,
including zero. */
+ @ParameterizedTest
+ @CsvSource({"10000", "0"})
+ void testExecutionGraphCacheTTLCustomValue(long cacheTTLMillis) {
+ final Duration refreshInterval = Duration.ofMillis(5000L);
+ final Duration cacheTTL = Duration.ofMillis(cacheTTLMillis);
+ final Configuration config = new Configuration();
+ config.set(WebOptions.REFRESH_INTERVAL, refreshInterval);
+ config.set(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT, cacheTTL);
+
+ RestHandlerConfiguration restHandlerConfiguration =
+ RestHandlerConfiguration.fromConfiguration(config);
+
+ // CACHE_EXECUTION_GRAPH_TIMEOUT should be independent of
REFRESH_INTERVAL
+ assertThat(restHandlerConfiguration.getRefreshInterval())
+ .isEqualTo(refreshInterval.toMillis());
+
assertThat(restHandlerConfiguration.getExecutionGraphCacheTTL()).isEqualTo(cacheTTL);
+ }
+
+ /** Tests that negative ExecutionGraph cache TTL throws
IllegalArgumentException. */
+ @Test
+ void testExecutionGraphCacheTTLNegativeValue() {
+ final Duration negativeCacheTTL = Duration.ofMillis(-1000L);
+ final Configuration config = new Configuration();
+ config.set(RestOptions.CACHE_EXECUTION_GRAPH_TIMEOUT,
negativeCacheTTL);
+
+ assertThatThrownBy(() ->
RestHandlerConfiguration.fromConfiguration(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("ExecutionGraph cache TTL should not be
negative");
+ }
}