This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4931d08d7b2 [FLINK-38229][runtime-web] Enhanced Job History Retention 
Policies for HistoryServer (#26902)
4931d08d7b2 is described below

commit 4931d08d7b2eb9d4d8d915f32456c9909c3e423d
Author: Yuepeng Pan <[email protected]>
AuthorDate: Fri Oct 31 14:22:17 2025 +0800

    [FLINK-38229][runtime-web] Enhanced Job History Retention Policies for 
HistoryServer (#26902)
---
 .../generated/history_server_configuration.html    |   8 +-
 .../flink/configuration/HistoryServerOptions.java  |  69 +++++++-
 .../runtime/webmonitor/history/HistoryServer.java  |  10 +-
 .../history/HistoryServerArchiveFetcher.java       |  24 +--
 .../retaining/CompositeJobRetainedStrategy.java    | 109 +++++++++++++
 .../history/retaining/JobRetainedStrategy.java     |  34 ++++
 .../CompositeJobRetainedStrategyTest.java          | 176 +++++++++++++++++++++
 7 files changed, 402 insertions(+), 28 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/history_server_configuration.html 
b/docs/layouts/shortcodes/generated/history_server_configuration.html
index 3f80eb1fbdf..8b171ba627c 100644
--- a/docs/layouts/shortcodes/generated/history_server_configuration.html
+++ b/docs/layouts/shortcodes/generated/history_server_configuration.html
@@ -30,7 +30,13 @@
             <td><h5>historyserver.archive.retained-jobs</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>The maximum number of jobs to retain in each archive directory 
defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no 
limit to the number of archives. If set to `0` or less than `-1` HistoryServer 
will throw an <code 
class="highlighter-rouge">IllegalConfigurationException</code>. </td>
+            <td>The maximum number of jobs to retain in each archive directory 
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>. 
<ul><li>If the option is not specified as a positive number without specifying 
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all 
of the jobs archives will be retained. </li><li>If the option is specified as a 
positive number without specifying a value of <code 
class="highlighter-rouge">historyserver.arc [...]
+        </tr>
+        <tr>
+            <td><h5>historyserver.archive.retained-ttl</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The time-to-live duration to retain the jobs archived in each 
archive directory defined by <code 
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the 
option is not specified without specifying <code 
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, all of 
the jobs archives will be retained. </li><li>If the option is specified without 
specifying <code 
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, the jobs 
[...]
         </tr>
         <tr>
             <td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 0ead9cdeed1..1628d266f35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /** The set of configuration options relating to the HistoryServer. */
 @PublicEvolving
@@ -126,21 +127,75 @@ public class HistoryServerOptions {
                             "Enable HTTPs access to the HistoryServer web 
frontend. This is applicable only when the"
                                     + " global SSL flag security.ssl.enabled 
is set to true.");
 
+    private static final String HISTORY_SERVER_RETAINED_JOBS_KEY =
+            "historyserver.archive.retained-jobs";
+    private static final String HISTORY_SERVER_RETAINED_TTL_KEY =
+            "historyserver.archive.retained-ttl";
+    private static final String NOTE_MESSAGE =
+            "Note, when there are multiple history server instances, two 
recommended approaches when using this option are: ";
+    private static final String CONFIGURE_SINGLE_INSTANCE =
+            "Specify the option in only one HistoryServer instance to avoid 
errors caused by multiple instances simultaneously cleaning up remote files, ";
+    private static final String CONFIGURE_CONSISTENT =
+            "Or you can keep the value of this configuration consistent across 
them. ";
+
     public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
-            key("historyserver.archive.retained-jobs")
+            key(HISTORY_SERVER_RETAINED_JOBS_KEY)
                     .intType()
                     .defaultValue(-1)
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            String.format(
-                                                    "The maximum number of 
jobs to retain in each archive directory defined by `%s`. ",
-                                                    
HISTORY_SERVER_ARCHIVE_DIRS.key()))
-                                    .text(
-                                            "If set to `-1`(default), there is 
no limit to the number of archives. ")
+                                            "The maximum number of jobs to 
retain in each archive directory defined by %s. ",
+                                            
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+                                    .list(
+                                            text(
+                                                    "If the option is not 
specified as a positive number without specifying %s, all of the jobs archives 
will be retained. ",
+                                                    
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
+                                            text(
+                                                    "If the option is 
specified as a positive number without specifying a value of %s, the jobs 
archive whose order index based modification time is equals to or less than the 
value will be retained. ",
+                                                    
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
+                                            text(
+                                                    "If this option is 
specified as a positive number together with the specified %s option, the job 
archive will be removed if its TTL has expired or the retained job count has 
been reached. ",
+                                                    
code(HISTORY_SERVER_RETAINED_TTL_KEY)))
                                     .text(
-                                            "If set to `0` or less than `-1` 
HistoryServer will throw an %s. ",
+                                            "If set to %s or less than %s, 
HistoryServer will throw an %s. ",
+                                            code("0"),
+                                            code("-1"),
                                             
code("IllegalConfigurationException"))
+                                    .linebreak()
+                                    .text(NOTE_MESSAGE)
+                                    .list(
+                                            text(CONFIGURE_SINGLE_INSTANCE),
+                                            text(CONFIGURE_CONSISTENT))
+                                    .build());
+
+    public static final ConfigOption<Duration> HISTORY_SERVER_RETAINED_TTL =
+            key(HISTORY_SERVER_RETAINED_TTL_KEY)
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The time-to-live duration to 
retain the jobs archived in each archive directory defined by %s. ",
+                                            
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+                                    .list(
+                                            text(
+                                                    "If the option is not 
specified without specifying %s, all of the jobs archives will be retained. ",
+                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
+                                            text(
+                                                    "If the option is 
specified without specifying %s, the jobs archive whose modification time in 
the time-to-live duration will be retained. ",
+                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
+                                            text(
+                                                    "If this option is 
specified as a positive time duration together with the %s option, the job 
archive will be removed if its TTL has expired or the retained job count has 
been reached. ",
+                                                    
code(HISTORY_SERVER_RETAINED_JOBS_KEY)))
+                                    .text(
+                                            "If set to equal to or less than 
%s milliseconds, HistoryServer will throw an %s. ",
+                                            code("0"), 
code("IllegalConfigurationException"))
+                                    .linebreak()
+                                    .text(NOTE_MESSAGE)
+                                    .list(
+                                            text(CONFIGURE_SINGLE_INSTANCE),
+                                            text(CONFIGURE_CONSISTENT))
                                     .build());
 
     private HistoryServerOptions() {}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 0162d994d30..51c5a067598 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HistoryServerOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -38,6 +37,7 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.Runnables;
+import 
org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
 import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.ExceptionUtils;
@@ -230,19 +230,13 @@ public class HistoryServer {
 
         refreshIntervalMillis =
                 
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
-        int maxHistorySize = 
config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
-        if (maxHistorySize == 0 || maxHistorySize < -1) {
-            throw new IllegalConfigurationException(
-                    "Cannot set %s to 0 or less than -1",
-                    HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
-        }
         archiveFetcher =
                 new HistoryServerArchiveFetcher(
                         refreshDirs,
                         webDir,
                         jobArchiveEventListener,
                         cleanupExpiredArchives,
-                        maxHistorySize);
+                        CompositeJobRetainedStrategy.createFrom(config));
 
         this.shutdownHook =
                 ShutdownHookUtil.addShutdownHook(
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index a8d782354a0..78452c22179 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import 
org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
@@ -112,8 +113,7 @@ class HistoryServerArchiveFetcher {
     private final List<HistoryServer.RefreshLocation> refreshDirs;
     private final Consumer<ArchiveEvent> jobArchiveEventListener;
     private final boolean processExpiredArchiveDeletion;
-    private final boolean processBeyondLimitArchiveDeletion;
-    private final int maxHistorySize;
+    private final JobRetainedStrategy jobRetainedStrategy;
 
     /** Cache of all available jobs identified by their id. */
     private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
@@ -127,13 +127,12 @@ class HistoryServerArchiveFetcher {
             File webDir,
             Consumer<ArchiveEvent> jobArchiveEventListener,
             boolean cleanupExpiredArchives,
-            int maxHistorySize)
+            JobRetainedStrategy jobRetainedStrategy)
             throws IOException {
         this.refreshDirs = checkNotNull(refreshDirs);
         this.jobArchiveEventListener = jobArchiveEventListener;
         this.processExpiredArchiveDeletion = cleanupExpiredArchives;
-        this.maxHistorySize = maxHistorySize;
-        this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
+        this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
         this.cachedArchivesPerRefreshDirectory = new HashMap<>();
         for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
             cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new 
HashSet<>());
@@ -159,7 +158,7 @@ class HistoryServerArchiveFetcher {
             Map<Path, Set<String>> jobsToRemove = new HashMap<>();
             cachedArchivesPerRefreshDirectory.forEach(
                     (path, archives) -> jobsToRemove.put(path, new 
HashSet<>(archives)));
-            Map<Path, Set<Path>> archivesBeyondSizeLimit = new HashMap<>();
+            Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
             for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
                 Path refreshDir = refreshLocation.getPath();
                 LOG.debug("Checking archive directory {}.", refreshDir);
@@ -176,7 +175,7 @@ class HistoryServerArchiveFetcher {
                     continue;
                 }
 
-                int historySize = 0;
+                int fileOrderedIndexOnModifiedTime = 0;
                 for (FileStatus jobArchive : jobArchives) {
                     Path jobArchivePath = jobArchive.getPath();
                     String jobID = jobArchivePath.getName();
@@ -186,9 +185,10 @@ class HistoryServerArchiveFetcher {
 
                     jobsToRemove.get(refreshDir).remove(jobID);
 
-                    historySize++;
-                    if (historySize > maxHistorySize && 
processBeyondLimitArchiveDeletion) {
-                        archivesBeyondSizeLimit
+                    fileOrderedIndexOnModifiedTime++;
+                    if (!jobRetainedStrategy.shouldRetain(
+                            jobArchive, fileOrderedIndexOnModifiedTime)) {
+                        archivesBeyondRetainedLimit
                                 .computeIfAbsent(refreshDir, ignored -> new 
HashSet<>())
                                 .add(jobArchivePath);
                         continue;
@@ -220,8 +220,8 @@ class HistoryServerArchiveFetcher {
                     && processExpiredArchiveDeletion) {
                 events.addAll(cleanupExpiredJobs(jobsToRemove));
             }
-            if (!archivesBeyondSizeLimit.isEmpty() && 
processBeyondLimitArchiveDeletion) {
-                
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
+            if (!archivesBeyondRetainedLimit.isEmpty()) {
+                
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
             }
             if (!events.isEmpty()) {
                 updateJobOverview(webOverviewDir, webDir);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
new file mode 100644
index 00000000000..acd35c93f79
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileStatus;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+
+/** The retained strategy. */
+public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+
+    public static JobRetainedStrategy createFrom(ReadableConfig config) {
+        int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+        Optional<Duration> retainedTtlOpt = 
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+        return new CompositeJobRetainedStrategy(
+                new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
+                new 
TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+    }
+
+    private final List<JobRetainedStrategy> strategies;
+
+    CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+        this.strategies =
+                strategies == null || strategies.length == 0
+                        ? Collections.emptyList()
+                        : Arrays.asList(strategies);
+    }
+
+    @Override
+    public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+        if (strategies.isEmpty()) {
+            return true;
+        }
+        return strategies.stream().allMatch(s -> s.shouldRetain(file, 
fileOrderedIndex));
+    }
+}
+
+/** The time to live based retained strategy. */
+class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+
+    @Nullable private final Duration ttlThreshold;
+
+    TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+        if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
+            throw new IllegalConfigurationException(
+                    "Cannot set %s to 0 or less than 0 milliseconds",
+                    HISTORY_SERVER_RETAINED_TTL.key());
+        }
+        this.ttlThreshold = ttlThreshold;
+    }
+
+    @Override
+    public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+        if (ttlThreshold == null) {
+            return true;
+        }
+        return Instant.now().toEpochMilli() - file.getModificationTime() < 
ttlThreshold.toMillis();
+    }
+}
+
+/** The job quantity based retained strategy. */
+class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+
+    private final int quantityThreshold;
+
+    QuantityJobRetainedStrategy(int quantityThreshold) {
+        if (quantityThreshold == 0 || quantityThreshold < -1) {
+            throw new IllegalConfigurationException(
+                    "Cannot set %s to 0 or less than -1", 
HISTORY_SERVER_RETAINED_JOBS.key());
+        }
+        this.quantityThreshold = quantityThreshold;
+    }
+
+    @Override
+    public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+        if (quantityThreshold == -1) {
+            return true;
+        }
+        return fileOrderedIndex <= quantityThreshold;
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
new file mode 100644
index 00000000000..2ef991698bf
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.core.fs.FileStatus;
+
+/** To define the strategy interface to judge whether the file should be 
retained. */
+public interface JobRetainedStrategy {
+
+    /**
+     * Judge whether the file should be retained.
+     *
+     * @param file the target file to judge.
+     * @param fileOrderedIndex the specified order index position of the 
target file,
+     * @return The result that indicates whether the file should be retained.
+     */
+    boolean shouldRetain(FileStatus file, int fileOrderedIndex);
+}
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
new file mode 100644
index 00000000000..e8983967df5
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
+import static 
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Testing for {@link CompositeJobRetainedStrategy}. */
+class CompositeJobRetainedStrategyTest {
+
+    @Test
+    void testTimeToLiveBasedJobRetainedStrategy() {
+        final Configuration conf = new Configuration();
+
+        // Test for invalid option value.
+        conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
+        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+                .isInstanceOf(IllegalConfigurationException.class);
+        // Skipped for option value that is less than 0 milliseconds, which 
will throw a
+        // java.lang.NumberFormatException caused by TimeUtils.
+
+        conf.removeConfig(HISTORY_SERVER_RETAINED_TTL);
+
+        // Test the case where no specific retention policy is configured, 
i.e., all archived files
+        // are retained.
+        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+        assertThat(
+                        strategy.shouldRetain(
+                                new TestingFileStatus(
+                                        Instant.now().toEpochMilli()
+                                                - 
Duration.ofMinutes(1).toMillis()),
+                                1))
+                .isTrue();
+
+        // Test the case where TTL-based retention policies is specified only.
+        conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
+        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+        assertThat(
+                        strategy.shouldRetain(
+                                new TestingFileStatus(
+                                        Instant.now().toEpochMilli()
+                                                - 
Duration.ofMinutes(1).toMillis()),
+                                1))
+                .isFalse();
+    }
+
+    @Test
+    void testQuantityBasedJobRetainedStrategy() {
+        final Configuration conf = new Configuration();
+
+        // Test for invalid option value.
+        conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
+        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+                .isInstanceOf(IllegalConfigurationException.class);
+        conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
+        assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+                .isInstanceOf(IllegalConfigurationException.class);
+
+        conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+
+        // Test the case where no specific retention policy is configured, 
i.e., all archived files
+        // are retained.
+        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
+
+        // Test the case where QUANTITY-based retention policies is specified 
only.
+        conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
+        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 
3)).isFalse();
+    }
+
+    @Test
+    void testCompositeBasedJobRetainedStrategy() {
+
+        final long outOfTtlMillis =
+                Instant.now().toEpochMilli() - 
Duration.ofMinutes(2L).toMillis();
+
+        // Test the case where no specific retention policy is configured, 
i.e., all archived files
+        // are retained.
+        final Configuration conf = new Configuration();
+        JobRetainedStrategy strategy = 
CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 1)).isTrue();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 
10)).isTrue();
+        assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 3)).isTrue();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+
+        // Test the case where both retention policies are specified.
+        conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
+        conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
+        strategy = CompositeJobRetainedStrategy.createFrom(conf);
+        assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 1)).isFalse();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 
10)).isFalse();
+        assertThat(strategy.shouldRetain(new 
TestingFileStatus(outOfTtlMillis), 3)).isFalse();
+        assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+    }
+
+    private static final class TestingFileStatus implements FileStatus {
+
+        private final long modificationTime;
+
+        TestingFileStatus() {
+            this(Instant.now().toEpochMilli());
+        }
+
+        TestingFileStatus(long modificationTime) {
+            this.modificationTime = modificationTime;
+        }
+
+        @Override
+        public long getLen() {
+            return 0;
+        }
+
+        @Override
+        public long getBlockSize() {
+            return 0;
+        }
+
+        @Override
+        public short getReplication() {
+            return 0;
+        }
+
+        @Override
+        public long getModificationTime() {
+            return modificationTime;
+        }
+
+        @Override
+        public long getAccessTime() {
+            return 0;
+        }
+
+        @Override
+        public boolean isDir() {
+            return false;
+        }
+
+        @Override
+        public Path getPath() {
+            return null;
+        }
+    }
+}

Reply via email to