This is an automated email from the ASF dual-hosted git repository.
weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4931d08d7b2 [FLINK-38229][runtime-web] Enhanced Job History Retention
Policies for HistoryServer (#26902)
4931d08d7b2 is described below
commit 4931d08d7b2eb9d4d8d915f32456c9909c3e423d
Author: Yuepeng Pan <[email protected]>
AuthorDate: Fri Oct 31 14:22:17 2025 +0800
[FLINK-38229][runtime-web] Enhanced Job History Retention Policies for
HistoryServer (#26902)
---
.../generated/history_server_configuration.html | 8 +-
.../flink/configuration/HistoryServerOptions.java | 69 +++++++-
.../runtime/webmonitor/history/HistoryServer.java | 10 +-
.../history/HistoryServerArchiveFetcher.java | 24 +--
.../retaining/CompositeJobRetainedStrategy.java | 109 +++++++++++++
.../history/retaining/JobRetainedStrategy.java | 34 ++++
.../CompositeJobRetainedStrategyTest.java | 176 +++++++++++++++++++++
7 files changed, 402 insertions(+), 28 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/history_server_configuration.html
b/docs/layouts/shortcodes/generated/history_server_configuration.html
index 3f80eb1fbdf..8b171ba627c 100644
--- a/docs/layouts/shortcodes/generated/history_server_configuration.html
+++ b/docs/layouts/shortcodes/generated/history_server_configuration.html
@@ -30,7 +30,13 @@
<td><h5>historyserver.archive.retained-jobs</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
- <td>The maximum number of jobs to retain in each archive directory
defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no
limit to the number of archives. If set to `0` or less than `-1` HistoryServer
will throw an <code
class="highlighter-rouge">IllegalConfigurationException</code>. </td>
+ <td>The maximum number of jobs to retain in each archive directory
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>.
<ul><li>If the option is not specified as a positive number without specifying
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all
of the jobs archives will be retained. </li><li>If the option is specified as a
positive number without specifying a value of <code
class="highlighter-rouge">historyserver.arc [...]
+ </tr>
+ <tr>
+ <td><h5>historyserver.archive.retained-ttl</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The time-to-live duration to retain the jobs archived in each
archive directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the
option is not specified without specifying <code
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, all of
the jobs archives will be retained. </li><li>If the option is specified without
specifying <code
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, the jobs
[...]
</tr>
<tr>
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 0ead9cdeed1..1628d266f35 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
/** The set of configuration options relating to the HistoryServer. */
@PublicEvolving
@@ -126,21 +127,75 @@ public class HistoryServerOptions {
"Enable HTTPs access to the HistoryServer web
frontend. This is applicable only when the"
+ " global SSL flag security.ssl.enabled
is set to true.");
+ private static final String HISTORY_SERVER_RETAINED_JOBS_KEY =
+ "historyserver.archive.retained-jobs";
+ private static final String HISTORY_SERVER_RETAINED_TTL_KEY =
+ "historyserver.archive.retained-ttl";
+ private static final String NOTE_MESSAGE =
+ "Note, when there are multiple history server instances, two
recommended approaches when using this option are: ";
+ private static final String CONFIGURE_SINGLE_INSTANCE =
+ "Specify the option in only one HistoryServer instance to avoid
errors caused by multiple instances simultaneously cleaning up remote files, ";
+ private static final String CONFIGURE_CONSISTENT =
+ "Or you can keep the value of this configuration consistent across
them. ";
+
public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
- key("historyserver.archive.retained-jobs")
+ key(HISTORY_SERVER_RETAINED_JOBS_KEY)
.intType()
.defaultValue(-1)
.withDescription(
Description.builder()
.text(
- String.format(
- "The maximum number of
jobs to retain in each archive directory defined by `%s`. ",
-
HISTORY_SERVER_ARCHIVE_DIRS.key()))
- .text(
- "If set to `-1`(default), there is
no limit to the number of archives. ")
+ "The maximum number of jobs to
retain in each archive directory defined by %s. ",
+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+ .list(
+ text(
+ "If the option is not
specified as a positive number without specifying %s, all of the jobs archives
will be retained. ",
+
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
+ text(
+ "If the option is
specified as a positive number without specifying a value of %s, the jobs
archive whose order index based modification time is equals to or less than the
value will be retained. ",
+
code(HISTORY_SERVER_RETAINED_TTL_KEY)),
+ text(
+ "If this option is
specified as a positive number together with the specified %s option, the job
archive will be removed if its TTL has expired or the retained job count has
been reached. ",
+
code(HISTORY_SERVER_RETAINED_TTL_KEY)))
.text(
- "If set to `0` or less than `-1`
HistoryServer will throw an %s. ",
+ "If set to %s or less than %s,
HistoryServer will throw an %s. ",
+ code("0"),
+ code("-1"),
code("IllegalConfigurationException"))
+ .linebreak()
+ .text(NOTE_MESSAGE)
+ .list(
+ text(CONFIGURE_SINGLE_INSTANCE),
+ text(CONFIGURE_CONSISTENT))
+ .build());
+
+ public static final ConfigOption<Duration> HISTORY_SERVER_RETAINED_TTL =
+ key(HISTORY_SERVER_RETAINED_TTL_KEY)
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The time-to-live duration to
retain the jobs archived in each archive directory defined by %s. ",
+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+ .list(
+ text(
+ "If the option is not
specified without specifying %s, all of the jobs archives will be retained. ",
+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
+ text(
+ "If the option is
specified without specifying %s, the jobs archive whose modification time in
the time-to-live duration will be retained. ",
+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
+ text(
+ "If this option is
specified as a positive time duration together with the %s option, the job
archive will be removed if its TTL has expired or the retained job count has
been reached. ",
+
code(HISTORY_SERVER_RETAINED_JOBS_KEY)))
+ .text(
+ "If set to equal to or less than
%s milliseconds, HistoryServer will throw an %s. ",
+ code("0"),
code("IllegalConfigurationException"))
+ .linebreak()
+ .text(NOTE_MESSAGE)
+ .list(
+ text(CONFIGURE_SINGLE_INSTANCE),
+ text(CONFIGURE_CONSISTENT))
.build());
private HistoryServerOptions() {}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 0162d994d30..51c5a067598 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HistoryServerOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
@@ -38,6 +37,7 @@ import
org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Runnables;
+import
org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.ExceptionUtils;
@@ -230,19 +230,13 @@ public class HistoryServer {
refreshIntervalMillis =
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
- int maxHistorySize =
config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
- if (maxHistorySize == 0 || maxHistorySize < -1) {
- throw new IllegalConfigurationException(
- "Cannot set %s to 0 or less than -1",
- HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
- }
archiveFetcher =
new HistoryServerArchiveFetcher(
refreshDirs,
webDir,
jobArchiveEventListener,
cleanupExpiredArchives,
- maxHistorySize);
+ CompositeJobRetainedStrategy.createFrom(config));
this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index a8d782354a0..78452c22179 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -112,8 +113,7 @@ class HistoryServerArchiveFetcher {
private final List<HistoryServer.RefreshLocation> refreshDirs;
private final Consumer<ArchiveEvent> jobArchiveEventListener;
private final boolean processExpiredArchiveDeletion;
- private final boolean processBeyondLimitArchiveDeletion;
- private final int maxHistorySize;
+ private final JobRetainedStrategy jobRetainedStrategy;
/** Cache of all available jobs identified by their id. */
private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
@@ -127,13 +127,12 @@ class HistoryServerArchiveFetcher {
File webDir,
Consumer<ArchiveEvent> jobArchiveEventListener,
boolean cleanupExpiredArchives,
- int maxHistorySize)
+ JobRetainedStrategy jobRetainedStrategy)
throws IOException {
this.refreshDirs = checkNotNull(refreshDirs);
this.jobArchiveEventListener = jobArchiveEventListener;
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
- this.maxHistorySize = maxHistorySize;
- this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
+ this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new
HashSet<>());
@@ -159,7 +158,7 @@ class HistoryServerArchiveFetcher {
Map<Path, Set<String>> jobsToRemove = new HashMap<>();
cachedArchivesPerRefreshDirectory.forEach(
(path, archives) -> jobsToRemove.put(path, new
HashSet<>(archives)));
- Map<Path, Set<Path>> archivesBeyondSizeLimit = new HashMap<>();
+ Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
Path refreshDir = refreshLocation.getPath();
LOG.debug("Checking archive directory {}.", refreshDir);
@@ -176,7 +175,7 @@ class HistoryServerArchiveFetcher {
continue;
}
- int historySize = 0;
+ int fileOrderedIndexOnModifiedTime = 0;
for (FileStatus jobArchive : jobArchives) {
Path jobArchivePath = jobArchive.getPath();
String jobID = jobArchivePath.getName();
@@ -186,9 +185,10 @@ class HistoryServerArchiveFetcher {
jobsToRemove.get(refreshDir).remove(jobID);
- historySize++;
- if (historySize > maxHistorySize &&
processBeyondLimitArchiveDeletion) {
- archivesBeyondSizeLimit
+ fileOrderedIndexOnModifiedTime++;
+ if (!jobRetainedStrategy.shouldRetain(
+ jobArchive, fileOrderedIndexOnModifiedTime)) {
+ archivesBeyondRetainedLimit
.computeIfAbsent(refreshDir, ignored -> new
HashSet<>())
.add(jobArchivePath);
continue;
@@ -220,8 +220,8 @@ class HistoryServerArchiveFetcher {
&& processExpiredArchiveDeletion) {
events.addAll(cleanupExpiredJobs(jobsToRemove));
}
- if (!archivesBeyondSizeLimit.isEmpty() &&
processBeyondLimitArchiveDeletion) {
-
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
+ if (!archivesBeyondRetainedLimit.isEmpty()) {
+
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
}
if (!events.isEmpty()) {
updateJobOverview(webOverviewDir, webDir);
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
new file mode 100644
index 00000000000..acd35c93f79
--- /dev/null
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileStatus;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+
+/** The retained strategy. */
+public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+
+ public static JobRetainedStrategy createFrom(ReadableConfig config) {
+ int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+ Optional<Duration> retainedTtlOpt =
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+ return new CompositeJobRetainedStrategy(
+ new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
+ new
TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+ }
+
+ private final List<JobRetainedStrategy> strategies;
+
+ CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+ this.strategies =
+ strategies == null || strategies.length == 0
+ ? Collections.emptyList()
+ : Arrays.asList(strategies);
+ }
+
+ @Override
+ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+ if (strategies.isEmpty()) {
+ return true;
+ }
+ return strategies.stream().allMatch(s -> s.shouldRetain(file,
fileOrderedIndex));
+ }
+}
+
+/** The time to live based retained strategy. */
+class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+
+ @Nullable private final Duration ttlThreshold;
+
+ TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+ if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than 0 milliseconds",
+ HISTORY_SERVER_RETAINED_TTL.key());
+ }
+ this.ttlThreshold = ttlThreshold;
+ }
+
+ @Override
+ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+ if (ttlThreshold == null) {
+ return true;
+ }
+ return Instant.now().toEpochMilli() - file.getModificationTime() <
ttlThreshold.toMillis();
+ }
+}
+
+/** The job quantity based retained strategy. */
+class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+
+ private final int quantityThreshold;
+
+ QuantityJobRetainedStrategy(int quantityThreshold) {
+ if (quantityThreshold == 0 || quantityThreshold < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1",
HISTORY_SERVER_RETAINED_JOBS.key());
+ }
+ this.quantityThreshold = quantityThreshold;
+ }
+
+ @Override
+ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
+ if (quantityThreshold == -1) {
+ return true;
+ }
+ return fileOrderedIndex <= quantityThreshold;
+ }
+}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
new file mode 100644
index 00000000000..2ef991698bf
--- /dev/null
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.core.fs.FileStatus;
+
+/** To define the strategy interface to judge whether the file should be
retained. */
+public interface JobRetainedStrategy {
+
+ /**
+ * Judge whether the file should be retained.
+ *
+ * @param file the target file to judge.
+ * @param fileOrderedIndex the specified order index position of the
target file,
+ * @return The result that indicates whether the file should be retained.
+ */
+ boolean shouldRetain(FileStatus file, int fileOrderedIndex);
+}
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
new file mode 100644
index 00000000000..e8983967df5
--- /dev/null
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history.retaining;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Testing for {@link CompositeJobRetainedStrategy}. */
+class CompositeJobRetainedStrategyTest {
+
+ @Test
+ void testTimeToLiveBasedJobRetainedStrategy() {
+ final Configuration conf = new Configuration();
+
+ // Test for invalid option value.
+ conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
+ assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ .isInstanceOf(IllegalConfigurationException.class);
+ // Skipped for option value that is less than 0 milliseconds, which
will throw a
+ // java.lang.NumberFormatException caused by TimeUtils.
+
+ conf.removeConfig(HISTORY_SERVER_RETAINED_TTL);
+
+ // Test the case where no specific retention policy is configured,
i.e., all archived files
+ // are retained.
+ JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+ assertThat(
+ strategy.shouldRetain(
+ new TestingFileStatus(
+ Instant.now().toEpochMilli()
+ -
Duration.ofMinutes(1).toMillis()),
+ 1))
+ .isTrue();
+
+ // Test the case where TTL-based retention policies is specified only.
+ conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
+ strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+ assertThat(
+ strategy.shouldRetain(
+ new TestingFileStatus(
+ Instant.now().toEpochMilli()
+ -
Duration.ofMinutes(1).toMillis()),
+ 1))
+ .isFalse();
+ }
+
+ @Test
+ void testQuantityBasedJobRetainedStrategy() {
+ final Configuration conf = new Configuration();
+
+ // Test for invalid option value.
+ conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
+ assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ .isInstanceOf(IllegalConfigurationException.class);
+ conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
+ assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ .isInstanceOf(IllegalConfigurationException.class);
+
+ conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+
+ // Test the case where no specific retention policy is configured,
i.e., all archived files
+ // are retained.
+ JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
+
+ // Test the case where QUANTITY-based retention policies is specified
only.
+ conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
+ strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(),
3)).isFalse();
+ }
+
+ @Test
+ void testCompositeBasedJobRetainedStrategy() {
+
+ final long outOfTtlMillis =
+ Instant.now().toEpochMilli() -
Duration.ofMinutes(2L).toMillis();
+
+ // Test the case where no specific retention policy is configured,
i.e., all archived files
+ // are retained.
+ final Configuration conf = new Configuration();
+ JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 1)).isTrue();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(),
10)).isTrue();
+ assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 3)).isTrue();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+
+ // Test the case where both retention policies are specified.
+ conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
+ conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
+ strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 1)).isFalse();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(),
10)).isFalse();
+ assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 3)).isFalse();
+ assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
+ }
+
+ private static final class TestingFileStatus implements FileStatus {
+
+ private final long modificationTime;
+
+ TestingFileStatus() {
+ this(Instant.now().toEpochMilli());
+ }
+
+ TestingFileStatus(long modificationTime) {
+ this.modificationTime = modificationTime;
+ }
+
+ @Override
+ public long getLen() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockSize() {
+ return 0;
+ }
+
+ @Override
+ public short getReplication() {
+ return 0;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
+ @Override
+ public long getAccessTime() {
+ return 0;
+ }
+
+ @Override
+ public boolean isDir() {
+ return false;
+ }
+
+ @Override
+ public Path getPath() {
+ return null;
+ }
+ }
+}