This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e3adbdce5a0 [FLINK-38777][history] HistoryServer supports application
archives
e3adbdce5a0 is described below
commit e3adbdce5a02a67656b49f765ddc328846d22082
Author: Yi Zhang <[email protected]>
AuthorDate: Fri Jan 16 16:34:56 2026 +0800
[FLINK-38777][history] HistoryServer supports application archives
---
.../generated/history_server_configuration.html | 18 +-
.../flink/configuration/HistoryServerOptions.java | 93 +++-
.../runtime/webmonitor/history/HistoryServer.java | 41 +-
.../HistoryServerApplicationArchiveFetcher.java | 273 ++++++++++++
.../history/HistoryServerArchiveFetcher.java | 274 ++++++------
...dStrategy.java => ArchiveRetainedStrategy.java} | 2 +-
....java => CompositeArchiveRetainedStrategy.java} | 48 +-
.../webmonitor/history/HistoryServerTest.java | 488 ++++++++++++++++++++-
...a => CompositeArchiveRetainedStrategyTest.java} | 94 +++-
9 files changed, 1113 insertions(+), 218 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/history_server_configuration.html
b/docs/layouts/shortcodes/generated/history_server_configuration.html
index 8b171ba627c..db696545a18 100644
--- a/docs/layouts/shortcodes/generated/history_server_configuration.html
+++ b/docs/layouts/shortcodes/generated/history_server_configuration.html
@@ -8,11 +8,17 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>historyserver.archive.clean-expired-applications</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether HistoryServer should cleanup applications that are no
longer present in the archive directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. </td>
+ </tr>
<tr>
<td><h5>historyserver.archive.clean-expired-jobs</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Whether HistoryServer should cleanup jobs that are no longer
present `historyserver.archive.fs.dir`.</td>
+ <td>Whether HistoryServer should cleanup jobs that are no longer
present in the archive directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <br />Note: This
option applies only to legacy job archives created before the introduction of
application archiving (FLINK-38761).</td>
</tr>
<tr>
<td><h5>historyserver.archive.fs.dir</h5></td>
@@ -26,17 +32,23 @@
<td>Duration</td>
<td>Interval for refreshing the archived job directories.</td>
</tr>
+ <tr>
+ <td><h5>historyserver.archive.retained-applications</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>The maximum number of applications to retain in each archive
directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. This option
works together with the TTL (see <code
class="highlighter-rouge">historyserver.archive.retained-ttl</code>). Archived
entities will be removed if their TTL has expired or the retention count limit
has been reached. <br />If set to <code class="highlighter-rouge">-1</code>
(default), there is no limit to the numb [...]
+ </tr>
<tr>
<td><h5>historyserver.archive.retained-jobs</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
- <td>The maximum number of jobs to retain in each archive directory
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>.
<ul><li>If the option is not specified as a positive number without specifying
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all
of the jobs archives will be retained. </li><li>If the option is specified as a
positive number without specifying a value of <code
class="highlighter-rouge">historyserver.arc [...]
+ <td>The maximum number of jobs to retain in each archive directory
defined by <code class="highlighter-rouge">historyserver.archive.fs.dir</code>.
<ul><li>If the option is not specified as a positive number without specifying
<code class="highlighter-rouge">historyserver.archive.retained-ttl</code>, all
of the jobs archives will be retained. </li><li>If the option is specified as a
positive number without specifying a value of <code
class="highlighter-rouge">historyserver.arc [...]
</tr>
<tr>
<td><h5>historyserver.archive.retained-ttl</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
- <td>The time-to-live duration to retain the jobs archived in each
archive directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. <ul><li>If the
option is not specified without specifying <code
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, all of
the jobs archives will be retained. </li><li>If the option is specified without
specifying <code
class="highlighter-rouge">historyserver.archive.retained-jobs</code>, the jobs
[...]
+ <td>The time-to-live duration to retain the archived entities
(jobs and applications) in each archive directory defined by <code
class="highlighter-rouge">historyserver.archive.fs.dir</code>. This option
works together with the retention count limits (see <code
class="highlighter-rouge">historyserver.archive.retained-applications</code>
and <code
class="highlighter-rouge">historyserver.archive.retained-jobs</code>). Archived
entities will be removed if their TTL has expired o [...]
</tr>
<tr>
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index 1628d266f35..1677dc2e1a0 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -51,17 +51,6 @@ public class HistoryServerOptions {
+ " monitor these directories for archived
jobs. You can configure the JobManager to archive jobs to a"
+ " directory via
`jobmanager.archive.fs.dir`.");
- /** If this option is enabled then deleted job archives are also deleted
from HistoryServer. */
- public static final ConfigOption<Boolean>
HISTORY_SERVER_CLEANUP_EXPIRED_JOBS =
- key("historyserver.archive.clean-expired-jobs")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- String.format(
- "Whether HistoryServer should cleanup jobs"
- + " that are no longer present
`%s`.",
- HISTORY_SERVER_ARCHIVE_DIRS.key()));
-
/**
* Pattern of the log URL of TaskManager. The HistoryServer will generate
actual URLs from it.
*/
@@ -137,6 +126,24 @@ public class HistoryServerOptions {
"Specify the option in only one HistoryServer instance to avoid
errors caused by multiple instances simultaneously cleaning up remote files, ";
private static final String CONFIGURE_CONSISTENT =
"Or you can keep the value of this configuration consistent across
them. ";
+ private static final String LEGACY_NOTE_MESSAGE =
+ "Note: This option applies only to legacy job archives created
before the introduction of application archiving (FLINK-38761).";
+ private static final String RETAINED_STRATEGY_MESSAGE =
+ "Archived entities will be removed if their TTL has expired or the
retention count limit has been reached. ";
+
+ /** If this option is enabled then deleted job archives are also deleted
from HistoryServer. */
+ public static final ConfigOption<Boolean>
HISTORY_SERVER_CLEANUP_EXPIRED_JOBS =
+ key("historyserver.archive.clean-expired-jobs")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Whether HistoryServer should
cleanup jobs that are no longer present in the archive directory defined by %s.
",
+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+ .linebreak()
+ .text(LEGACY_NOTE_MESSAGE)
+ .build());
public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
key(HISTORY_SERVER_RETAINED_JOBS_KEY)
@@ -164,6 +171,52 @@ public class HistoryServerOptions {
code("IllegalConfigurationException"))
.linebreak()
.text(NOTE_MESSAGE)
+ .list(
+ text(CONFIGURE_SINGLE_INSTANCE),
+ text(CONFIGURE_CONSISTENT))
+ .linebreak()
+ .text(LEGACY_NOTE_MESSAGE)
+ .build());
+
+ /**
+ * If this option is enabled then deleted application archives are also
deleted from
+ * HistoryServer.
+ */
+ public static final ConfigOption<Boolean>
HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS =
+ key("historyserver.archive.clean-expired-applications")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Whether HistoryServer should
cleanup applications that are no longer present in the archive directory
defined by %s. ",
+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+ .build());
+
+ public static final ConfigOption<Integer>
HISTORY_SERVER_RETAINED_APPLICATIONS =
+ key("historyserver.archive.retained-applications")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The maximum number of
applications to retain in each archive directory defined by %s. ",
+
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
+ .text(
+ "This option works together with
the TTL (see %s). ",
+
code(HISTORY_SERVER_RETAINED_TTL_KEY))
+ .text(RETAINED_STRATEGY_MESSAGE)
+ .linebreak()
+ .text(
+ "If set to %s (default), there is
no limit to the number of archives. ",
+ code("-1"))
+ .text(
+ "If set to %s or less than %s,
HistoryServer will throw an %s. ",
+ code("0"),
+ code("-1"),
+
code("IllegalConfigurationException"))
+ .linebreak()
+ .text(NOTE_MESSAGE)
.list(
text(CONFIGURE_SINGLE_INSTANCE),
text(CONFIGURE_CONSISTENT))
@@ -176,18 +229,14 @@ public class HistoryServerOptions {
.withDescription(
Description.builder()
.text(
- "The time-to-live duration to
retain the jobs archived in each archive directory defined by %s. ",
+ "The time-to-live duration to
retain the archived entities (jobs and applications) in each archive directory
defined by %s. ",
code(HISTORY_SERVER_ARCHIVE_DIRS.key()))
- .list(
- text(
- "If the option is not
specified without specifying %s, all of the jobs archives will be retained. ",
-
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
- text(
- "If the option is
specified without specifying %s, the jobs archive whose modification time in
the time-to-live duration will be retained. ",
-
code(HISTORY_SERVER_RETAINED_JOBS_KEY)),
- text(
- "If this option is
specified as a positive time duration together with the %s option, the job
archive will be removed if its TTL has expired or the retained job count has
been reached. ",
-
code(HISTORY_SERVER_RETAINED_JOBS_KEY)))
+ .text(
+ "This option works together with
the retention count limits (see %s and %s). ",
+
code(HISTORY_SERVER_RETAINED_APPLICATIONS.key()),
+
code(HISTORY_SERVER_RETAINED_JOBS_KEY))
+ .text(RETAINED_STRATEGY_MESSAGE)
+ .linebreak()
.text(
"If set to equal to or less than
%s milliseconds, HistoryServer will throw an %s. ",
code("0"),
code("IllegalConfigurationException"))
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 69d14e7fecb..95648156093 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -37,7 +37,7 @@ import
org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Runnables;
-import
org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
+import
org.apache.flink.runtime.webmonitor.history.retaining.CompositeArchiveRetainedStrategy;
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.ExceptionUtils;
@@ -93,6 +93,8 @@ import java.util.function.Consumer;
* <li>/config
* <li>/joboverview
* <li>/jobs/:jobid/*
+ * <li>/applications/overview
+ * <li>/applications/:applicationid/*
* </ul>
*
* <p>and relies on static files that are served by the {@link
@@ -110,8 +112,18 @@ public class HistoryServer {
private final long webRefreshIntervalMillis;
private final File webDir;
+ /**
+ * The archive fetcher is responsible for fetching job archives that are
not part of an
+ * application (legacy jobs created before application archiving was
introduced in FLINK-38761).
+ */
private final HistoryServerArchiveFetcher archiveFetcher;
+ /**
+ * The archive fetcher is responsible for fetching application archives
and their associated job
+ * archives.
+ */
+ private final HistoryServerApplicationArchiveFetcher
applicationArchiveFetcher;
+
@Nullable private final SSLHandlerFactory serverSSLFactory;
private WebFrontendBootstrap netty;
@@ -161,7 +173,7 @@ public class HistoryServer {
}
public HistoryServer(Configuration config) throws IOException,
FlinkException {
- this(config, (event) -> {});
+ this(config, (event) -> {}, (event) -> {});
}
/**
@@ -175,7 +187,9 @@ public class HistoryServer {
*/
public HistoryServer(
Configuration config,
- Consumer<HistoryServerArchiveFetcher.ArchiveEvent>
jobArchiveEventListener)
+ Consumer<HistoryServerArchiveFetcher.ArchiveEvent>
jobArchiveEventListener,
+ Consumer<HistoryServerApplicationArchiveFetcher.ArchiveEvent>
+ applicationArchiveEventListener)
throws IOException, FlinkException {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(jobArchiveEventListener);
@@ -199,8 +213,10 @@ public class HistoryServer {
webDir = clearWebDir(config);
- boolean cleanupExpiredArchives =
+ boolean cleanupExpiredJobs =
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS);
+ boolean cleanupExpiredApplications =
+
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS);
String refreshDirectories =
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
if (refreshDirectories == null) {
@@ -235,8 +251,15 @@ public class HistoryServer {
refreshDirs,
webDir,
jobArchiveEventListener,
- cleanupExpiredArchives,
- CompositeJobRetainedStrategy.createFrom(config));
+ cleanupExpiredJobs,
+
CompositeArchiveRetainedStrategy.createForJobFromConfig(config));
+ applicationArchiveFetcher =
+ new HistoryServerApplicationArchiveFetcher(
+ refreshDirs,
+ webDir,
+ applicationArchiveEventListener,
+ cleanupExpiredApplications,
+
CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config));
this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -339,7 +362,11 @@ public class HistoryServer {
private Runnable getArchiveFetchingRunnable() {
return Runnables.withUncaughtExceptionHandler(
- () -> archiveFetcher.fetchArchives(),
FatalExitExceptionHandler.INSTANCE);
+ () -> {
+ archiveFetcher.fetchArchives();
+ applicationArchiveFetcher.fetchArchives();
+ },
+ FatalExitExceptionHandler.INSTANCE);
}
void stop() {
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
new file mode 100644
index 00000000000..310e91b7753
--- /dev/null
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.ArchivePathUtils;
+import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import
org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
+import
org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
+import org.apache.flink.util.FileUtils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the application
and job archives that
+ * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}.
The directories are
+ * polled in regular intervals, defined by {@link
+ * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ *
+ * <p>The archives are downloaded and expanded into a file structure analog to
the REST API.
+ *
+ * <p>Removes existing archives from these directories and the cache according
to {@link
+ * ArchiveRetainedStrategy} and {@link
+ * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}.
+ */
+public class HistoryServerApplicationArchiveFetcher extends
HistoryServerArchiveFetcher {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class);
+
+ private static final String APPLICATIONS_SUBDIR = "applications";
+ private static final String APPLICATION_OVERVIEWS_SUBDIR =
"application-overviews";
+
+ private final Map<Path, Map<String, Set<String>>>
cachedApplicationIdsToJobIds =
+ new HashMap<>();
+
+ private final File webApplicationDir;
+ private final File webApplicationsOverviewDir;
+
+ HistoryServerApplicationArchiveFetcher(
+ List<HistoryServer.RefreshLocation> refreshDirs,
+ File webDir,
+ Consumer<HistoryServerApplicationArchiveFetcher.ArchiveEvent>
archiveEventListener,
+ boolean cleanupExpiredArchives,
+ ArchiveRetainedStrategy retainedStrategy)
+ throws IOException {
+ super(refreshDirs, webDir, archiveEventListener,
cleanupExpiredArchives, retainedStrategy);
+
+ for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
+ cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new
HashMap<>());
+ }
+ this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR);
+ Files.createDirectories(webApplicationDir.toPath());
+ this.webApplicationsOverviewDir = new File(webDir,
APPLICATION_OVERVIEWS_SUBDIR);
+ Files.createDirectories(webApplicationsOverviewDir.toPath());
+ updateApplicationOverview();
+ }
+
+ @Override
+ List<FileStatus> listValidArchives(FileSystem refreshFS, Path refreshDir)
throws IOException {
+ List<FileStatus> applicationArchiveDirs = new ArrayList<>();
+ FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir);
+ if (clusterDirs == null) {
+ // the entire refreshDirectory was removed
+ return applicationArchiveDirs;
+ }
+
+ // Check for application archive directories in the cluster
directories and named according
+ // to the application ID format
+ for (FileStatus clusterDir : clusterDirs) {
+ if (clusterDir.isDir() &&
isValidId(clusterDir.getPath().getName(), refreshDir)) {
+ Path applicationsDir =
+ new Path(clusterDir.getPath(),
ArchivePathUtils.APPLICATIONS_DIR);
+ FileStatus[] applicationDirs =
refreshFS.listStatus(applicationsDir);
+ if (applicationDirs == null) {
+ // the entire applicationsDirectory was removed
+ return applicationArchiveDirs;
+ }
+
+ for (FileStatus applicationDir : applicationDirs) {
+ if (applicationDir.isDir()
+ && isValidId(applicationDir.getPath().getName(),
refreshDir)) {
+ applicationArchiveDirs.add(applicationDir);
+ }
+ }
+ }
+ }
+
+ return applicationArchiveDirs;
+ }
+
+ private boolean isValidId(String id, Path refreshDir) {
+ try {
+ ApplicationID.fromHexString(id);
+ return true;
+ } catch (IllegalArgumentException iae) {
+ LOG.debug(
+ "Archive directory {} contained file with unexpected name
{}. Ignoring file.",
+ refreshDir,
+ id,
+ iae);
+ return false;
+ }
+ }
+
+ @Override
+ List<ArchiveEvent> processArchive(String archiveId, Path archivePath, Path
refreshDir)
+ throws IOException {
+ FileSystem fs = archivePath.getFileSystem();
+ Path applicationArchive = new Path(archivePath,
ArchivePathUtils.APPLICATION_ARCHIVE_NAME);
+ if (!fs.exists(applicationArchive)) {
+ throw new IOException("Application archive " + applicationArchive
+ " does not exist.");
+ }
+
+ List<ArchiveEvent> events = new ArrayList<>();
+ events.add(processApplicationArchive(archiveId, applicationArchive));
+
+ Path jobArchivesDir = new Path(archivePath, ArchivePathUtils.JOBS_DIR);
+
+ List<FileStatus> jobArchives = listValidJobArchives(fs,
jobArchivesDir);
+ for (FileStatus jobArchive : jobArchives) {
+ String jobId = jobArchive.getPath().getName();
+ cachedApplicationIdsToJobIds
+ .get(refreshDir)
+ .computeIfAbsent(archiveId, k -> new HashSet<>())
+ .add(jobId);
+ events.add(processJobArchive(jobId, jobArchive.getPath()));
+ }
+
+ return events;
+ }
+
+ private ArchiveEvent processApplicationArchive(String applicationId, Path
applicationArchive)
+ throws IOException {
+ for (ArchivedJson archive :
FsJsonArchivist.readArchivedJsons(applicationArchive)) {
+ String path = archive.getPath();
+ String json = archive.getJson();
+
+ File target;
+ if (path.equals(ApplicationsOverviewHeaders.URL)) {
+ target = new File(webApplicationsOverviewDir, applicationId +
JSON_FILE_ENDING);
+ } else {
+ // this implicitly writes into webApplicationDir
+ target = new File(webDir, path + JSON_FILE_ENDING);
+ }
+
+ writeTargetFile(target, json);
+ }
+
+ return new ArchiveEvent(applicationId, ArchiveEventType.CREATED);
+ }
+
+ @Override
+ void deleteFromRemote(Path archive) throws IOException {
+ // delete application archive directory recursively (including all its
job archives)
+ archive.getFileSystem().delete(archive, true);
+ }
+
+ @Override
+ List<ArchiveEvent> deleteCachedArchives(String archiveId, Path refreshDir)
{
+ LOG.info("Archive directories for application {} is deleted",
archiveId);
+ List<ArchiveEvent> deleteLog = new ArrayList<>();
+
+ deleteLog.add(deleteApplicationFiles(archiveId));
+
+ Set<String> jobIds =
cachedApplicationIdsToJobIds.get(refreshDir).remove(archiveId);
+ if (jobIds != null) {
+ jobIds.forEach(jobId -> deleteLog.add(deleteJobFiles(jobId)));
+ }
+
+ return deleteLog;
+ }
+
+ private ArchiveEvent deleteApplicationFiles(String applicationId) {
+ // Make sure we do not include this application in the overview
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationsOverviewDir, applicationId +
JSON_FILE_ENDING)
+ .toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from overview directory.", ioe);
+ }
+
+ // Clean up application files we may have created
+ File applicationDirectory = new File(webApplicationDir, applicationId);
+ try {
+ FileUtils.deleteDirectory(applicationDirectory);
+ } catch (IOException ioe) {
+ LOG.warn("Could not clean up application directory.", ioe);
+ }
+
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationDir, applicationId +
JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from application directory.", ioe);
+ }
+
+ return new ArchiveEvent(applicationId, ArchiveEventType.DELETED);
+ }
+
+ @Override
+ void updateOverview() {
+ updateApplicationOverview();
+ updateJobOverview();
+ }
+
+ /**
+ * This method replicates the JSON response that would be given by the
+ * ApplicationsOverviewHandler when listing applications.
+ *
+ * <p>Every application archive contains an overview entry with the same
structure. Since
+ * applications are archived on their own however the list of applications
only contains a
+ * single application.
+ *
+ * <p>For the display in the HistoryServer WebFrontend we have to combine
these overviews.
+ */
+ private void updateApplicationOverview() {
+ try (JsonGenerator gen =
+ jacksonFactory.createGenerator(
+ HistoryServer.createOrGetFile(webDir,
ApplicationsOverviewHeaders.URL))) {
+ File[] overviews = new
File(webApplicationsOverviewDir.getPath()).listFiles();
+ if (overviews != null) {
+ Collection<ApplicationDetails> allApplications = new
ArrayList<>(overviews.length);
+ for (File overview : overviews) {
+ MultipleApplicationsDetails subApplications =
+ mapper.readValue(overview,
MultipleApplicationsDetails.class);
+ allApplications.addAll(subApplications.getApplications());
+ }
+ mapper.writeValue(gen, new
MultipleApplicationsDetails(allApplications));
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to update application overview.", ioe);
+ }
+ }
+}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 4fe8bd58d5b..59eb258b3ed 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.history.FsJsonArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import
org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
+import
org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -48,7 +48,6 @@ import java.io.StringWriter;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -68,34 +67,31 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>The archives are downloaded and expanded into a file structure analog to
the REST API.
*
- * <p>Removes existing archives from these directories and the cache if
configured by {@link
- * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS} or {@link
- * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS}.
+ * <p>Removes existing archives from these directories and the cache according
to {@link
+ * ArchiveRetainedStrategy} and {@link
HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_JOBS}.
*/
class HistoryServerArchiveFetcher {
- /** Possible job archive operations in history-server. */
+ /** Possible archive operations in history-server. */
public enum ArchiveEventType {
- /** Job archive was found in one refresh location and created in
history server. */
+ /** Archive was found in one refresh location and created in history
server. */
CREATED,
- /**
- * Job archive was deleted from one of refresh locations and deleted
from history server.
- */
+ /** Archive was deleted from one of refresh locations and deleted from
history server. */
DELETED
}
- /** Representation of job archive event. */
+ /** Representation of archive event. */
public static class ArchiveEvent {
- private final String jobID;
+ private final String id;
private final ArchiveEventType operation;
- ArchiveEvent(String jobID, ArchiveEventType operation) {
- this.jobID = jobID;
+ ArchiveEvent(String id, ArchiveEventType operation) {
+ this.id = id;
this.operation = operation;
}
- public String getJobID() {
- return jobID;
+ public String getId() {
+ return id;
}
public ArchiveEventType getType() {
@@ -105,48 +101,50 @@ class HistoryServerArchiveFetcher {
private static final Logger LOG =
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
- private static final JsonFactory jacksonFactory = new JsonFactory();
- private static final ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper();
+ protected static final String JSON_FILE_ENDING = ".json";
+ protected static final String JOBS_SUBDIR = "jobs";
+ protected static final String JOB_OVERVIEWS_SUBDIR = "overviews";
- private static final String JSON_FILE_ENDING = ".json";
+ protected final JsonFactory jacksonFactory = new JsonFactory();
+ protected final ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper();
- private final List<HistoryServer.RefreshLocation> refreshDirs;
- private final Consumer<ArchiveEvent> jobArchiveEventListener;
- private final boolean processExpiredArchiveDeletion;
- private final JobRetainedStrategy jobRetainedStrategy;
+ protected final List<HistoryServer.RefreshLocation> refreshDirs;
+ protected final Consumer<ArchiveEvent> archiveEventListener;
+ protected final boolean processExpiredArchiveDeletion;
+ protected final ArchiveRetainedStrategy retainedStrategy;
- /** Cache of all available jobs identified by their id. */
- private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
+ /** Cache of all available archives identified by their id. */
+ protected final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
- private final File webDir;
- private final File webJobDir;
- private final File webOverviewDir;
+ protected final File webDir;
+ protected final File webJobDir;
+ protected final File webOverviewDir;
HistoryServerArchiveFetcher(
List<HistoryServer.RefreshLocation> refreshDirs,
File webDir,
- Consumer<ArchiveEvent> jobArchiveEventListener,
+ Consumer<ArchiveEvent> archiveEventListener,
boolean cleanupExpiredArchives,
- JobRetainedStrategy jobRetainedStrategy)
+ ArchiveRetainedStrategy retainedStrategy)
throws IOException {
this.refreshDirs = checkNotNull(refreshDirs);
- this.jobArchiveEventListener = jobArchiveEventListener;
+ this.archiveEventListener = archiveEventListener;
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
- this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
+ this.retainedStrategy = checkNotNull(retainedStrategy);
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new
HashSet<>());
}
this.webDir = checkNotNull(webDir);
- this.webJobDir = new File(webDir, "jobs");
+ this.webJobDir = new File(webDir, JOBS_SUBDIR);
Files.createDirectories(webJobDir.toPath());
- this.webOverviewDir = new File(webDir, "overviews");
+ this.webOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR);
Files.createDirectories(webOverviewDir.toPath());
- updateJobOverview(webOverviewDir, webDir);
+ updateJobOverview();
if (LOG.isInfoEnabled()) {
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
- LOG.info("Monitoring directory {} for archived jobs.",
refreshDir.getPath());
+ LOG.info("Monitoring directory {} for archives.",
refreshDir.getPath());
}
}
}
@@ -155,100 +153,102 @@ class HistoryServerArchiveFetcher {
try {
LOG.debug("Starting archive fetching.");
List<ArchiveEvent> events = new ArrayList<>();
- Map<Path, Set<String>> jobsToRemove = new HashMap<>();
+ Map<Path, Set<String>> archivesToRemove = new HashMap<>();
cachedArchivesPerRefreshDirectory.forEach(
- (path, archives) -> jobsToRemove.put(path, new
HashSet<>(archives)));
+ (path, archives) -> archivesToRemove.put(path, new
HashSet<>(archives)));
Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
Path refreshDir = refreshLocation.getPath();
LOG.debug("Checking archive directory {}.", refreshDir);
- // contents of /:refreshDir
- FileStatus[] jobArchives;
+ List<FileStatus> archives;
try {
- jobArchives = listArchives(refreshLocation.getFs(),
refreshDir);
+ archives = listValidArchives(refreshLocation.getFs(),
refreshDir);
+ archives.sort(
+
Comparator.comparingLong(FileStatus::getModificationTime).reversed());
} catch (IOException e) {
- LOG.error("Failed to access job archive location for path
{}.", refreshDir, e);
+ LOG.error("Failed to access archive location for path
{}.", refreshDir, e);
// something went wrong, potentially due to a concurrent
deletion
- // do not remove any jobs now; we will retry later
- jobsToRemove.remove(refreshDir);
+ // do not remove any archives now; we will retry later
+ archivesToRemove.remove(refreshDir);
continue;
}
int fileOrderedIndexOnModifiedTime = 0;
- for (FileStatus jobArchive : jobArchives) {
- Path jobArchivePath = jobArchive.getPath();
- String jobID = jobArchivePath.getName();
- if (!isValidJobID(jobID, refreshDir)) {
- continue;
- }
-
- jobsToRemove.get(refreshDir).remove(jobID);
+ for (FileStatus archive : archives) {
+ Path archivePath = archive.getPath();
+ String archiveId = archivePath.getName();
+ archivesToRemove.get(refreshDir).remove(archiveId);
fileOrderedIndexOnModifiedTime++;
- if (!jobRetainedStrategy.shouldRetain(
- jobArchive, fileOrderedIndexOnModifiedTime)) {
+ if (!retainedStrategy.shouldRetain(archive,
fileOrderedIndexOnModifiedTime)) {
archivesBeyondRetainedLimit
.computeIfAbsent(refreshDir, ignored -> new
HashSet<>())
- .add(jobArchivePath);
+ .add(archivePath);
continue;
}
- if
(cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) {
+ if
(cachedArchivesPerRefreshDirectory.get(refreshDir).contains(archiveId)) {
LOG.trace(
- "Ignoring archive {} because it was already
fetched.",
- jobArchivePath);
+ "Ignoring archive {} because it was already
fetched.", archivePath);
} else {
- LOG.info("Processing archive {}.", jobArchivePath);
+ LOG.info("Processing archive {}.", archivePath);
try {
- processArchive(jobID, jobArchivePath);
- events.add(new ArchiveEvent(jobID,
ArchiveEventType.CREATED));
-
cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID);
- LOG.info("Processing archive {} finished.",
jobArchivePath);
+ events.addAll(processArchive(archiveId,
archivePath, refreshDir));
+
cachedArchivesPerRefreshDirectory.get(refreshDir).add(archiveId);
+ LOG.info("Processing archive {} finished.",
archivePath);
} catch (IOException e) {
LOG.error(
- "Failure while fetching/processing job
archive for job {}.",
- jobID,
- e);
- deleteJobFiles(jobID);
+ "Failure while fetching/processing archive
{}.", archiveId, e);
+ deleteCachedArchives(archiveId, refreshDir);
}
}
}
}
- if
(jobsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
+ if
(archivesToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
&& processExpiredArchiveDeletion) {
- events.addAll(cleanupExpiredJobs(jobsToRemove));
+ events.addAll(cleanupExpiredArchives(archivesToRemove));
}
if (!archivesBeyondRetainedLimit.isEmpty()) {
-
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
+
events.addAll(cleanupArchivesBeyondRetainedLimit(archivesBeyondRetainedLimit));
}
if (!events.isEmpty()) {
- updateJobOverview(webOverviewDir, webDir);
+ updateOverview();
}
- events.forEach(jobArchiveEventListener::accept);
+ events.forEach(archiveEventListener);
LOG.debug("Finished archive fetching.");
} catch (Exception e) {
- LOG.error("Critical failure while fetching/processing job
archives.", e);
+ LOG.error("Critical failure while fetching/processing archives.",
e);
}
}
- private static FileStatus[] listArchives(FileSystem refreshFS, Path
refreshDir)
+ List<FileStatus> listValidArchives(FileSystem refreshFS, Path refreshDir)
throws IOException {
+ return listValidJobArchives(refreshFS, refreshDir);
+ }
+
+ List<FileStatus> listValidJobArchives(FileSystem refreshFS, Path
refreshDir)
throws IOException {
+ List<FileStatus> jobArchives = new ArrayList<>();
// contents of /:refreshDir
- FileStatus[] jobArchives = refreshFS.listStatus(refreshDir);
- if (jobArchives == null) {
+ FileStatus[] archives = refreshFS.listStatus(refreshDir);
+ if (archives == null) {
// the entire refreshDirectory was removed
- return new FileStatus[0];
+ return jobArchives;
}
- Arrays.sort(
- jobArchives,
Comparator.comparingLong(FileStatus::getModificationTime).reversed());
+ // Check for job archive files located directly in the refresh
directory and named according
+ // to the job ID format
+ for (FileStatus archive : archives) {
+ if (!archive.isDir() && isValidJobId(archive.getPath().getName(),
refreshDir)) {
+ jobArchives.add(archive);
+ }
+ }
return jobArchives;
}
- private static boolean isValidJobID(String jobId, Path refreshDir) {
+ boolean isValidJobId(String jobId, Path refreshDir) {
try {
JobID.fromHexString(jobId);
return true;
@@ -262,98 +262,110 @@ class HistoryServerArchiveFetcher {
}
}
- private void processArchive(String jobID, Path jobArchive) throws
IOException {
+ List<ArchiveEvent> processArchive(String archiveId, Path archivePath, Path
refreshDir)
+ throws IOException {
+ return Collections.singletonList(processJobArchive(archiveId,
archivePath));
+ }
+
+ ArchiveEvent processJobArchive(String jobId, Path jobArchive) throws
IOException {
for (ArchivedJson archive :
FsJsonArchivist.readArchivedJsons(jobArchive)) {
String path = archive.getPath();
String json = archive.getJson();
File target;
if (path.equals(JobsOverviewHeaders.URL)) {
- target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+ target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
} else if (path.equals("/joboverview")) { // legacy path
LOG.debug("Migrating legacy archive {}", jobArchive);
json = convertLegacyJobOverview(json);
- target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+ target = new File(webOverviewDir, jobId + JSON_FILE_ENDING);
} else {
// this implicitly writes into webJobDir
target = new File(webDir, path + JSON_FILE_ENDING);
}
- java.nio.file.Path parent = target.getParentFile().toPath();
+ writeTargetFile(target, json);
+ }
- try {
- Files.createDirectories(parent);
- } catch (FileAlreadyExistsException ignored) {
- // there may be left-over directories from the previous
- // attempt
- }
+ return new ArchiveEvent(jobId, ArchiveEventType.CREATED);
+ }
+
+ void writeTargetFile(File target, String json) throws IOException {
+ java.nio.file.Path parent = target.getParentFile().toPath();
+
+ try {
+ Files.createDirectories(parent);
+ } catch (FileAlreadyExistsException ignored) {
+ // there may be left-over directories from the previous attempt
+ }
- java.nio.file.Path targetPath = target.toPath();
+ java.nio.file.Path targetPath = target.toPath();
- // We overwrite existing files since this may be another attempt
- // at fetching this archive.
- // Existing files may be incomplete/corrupt.
- Files.deleteIfExists(targetPath);
+ // We overwrite existing files since this may be another attempt
+ // at fetching this archive.
+ // Existing files may be incomplete/corrupt.
+ Files.deleteIfExists(targetPath);
- Files.createFile(target.toPath());
- try (FileWriter fw = new FileWriter(target)) {
- fw.write(json);
- fw.flush();
- }
+ Files.createFile(target.toPath());
+ try (FileWriter fw = new FileWriter(target)) {
+ fw.write(json);
+ fw.flush();
}
}
- private List<ArchiveEvent> cleanupJobsBeyondSizeLimit(
- Map<Path, Set<Path>> jobArchivesToRemove) {
- Map<Path, Set<String>> allJobIdsToRemoveFromOverview = new HashMap<>();
+ List<ArchiveEvent> cleanupArchivesBeyondRetainedLimit(Map<Path, Set<Path>>
archivesToRemove) {
+ Map<Path, Set<String>> allArchiveIdsToRemove = new HashMap<>();
- for (Map.Entry<Path, Set<Path>> pathSetEntry :
jobArchivesToRemove.entrySet()) {
- HashSet<String> jobIdsToRemoveFromOverview = new HashSet<>();
+ for (Map.Entry<Path, Set<Path>> pathSetEntry :
archivesToRemove.entrySet()) {
+ HashSet<String> archiveIdsToRemove = new HashSet<>();
for (Path archive : pathSetEntry.getValue()) {
- jobIdsToRemoveFromOverview.add(archive.getName());
+ archiveIdsToRemove.add(archive.getName());
try {
- archive.getFileSystem().delete(archive, false);
+ deleteFromRemote(archive);
} catch (IOException ioe) {
- LOG.warn("Could not delete old archive " + archive, ioe);
+ LOG.warn("Could not delete old archive {}", archive, ioe);
}
}
- allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(),
jobIdsToRemoveFromOverview);
+ allArchiveIdsToRemove.put(pathSetEntry.getKey(),
archiveIdsToRemove);
}
- return cleanupExpiredJobs(allJobIdsToRemoveFromOverview);
+ return cleanupExpiredArchives(allArchiveIdsToRemove);
}
- private List<ArchiveEvent> cleanupExpiredJobs(Map<Path, Set<String>>
jobsToRemove) {
+ void deleteFromRemote(Path archive) throws IOException {
+ archive.getFileSystem().delete(archive, false);
+ }
+ List<ArchiveEvent> cleanupExpiredArchives(Map<Path, Set<String>>
archivesToRemove) {
List<ArchiveEvent> deleteLog = new ArrayList<>();
- LOG.info("Archive directories for jobs {} were deleted.",
jobsToRemove);
- jobsToRemove.forEach(
- (refreshDir, archivesToRemove) -> {
-
cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archivesToRemove);
+ archivesToRemove.forEach(
+ (refreshDir, archives) -> {
+
cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(archives);
+ archives.forEach(
+ archiveId ->
+
deleteLog.addAll(deleteCachedArchives(archiveId, refreshDir)));
});
- jobsToRemove.values().stream()
- .flatMap(Set::stream)
- .forEach(
- removedJobID -> {
- deleteJobFiles(removedJobID);
- deleteLog.add(new ArchiveEvent(removedJobID,
ArchiveEventType.DELETED));
- });
return deleteLog;
}
- private void deleteJobFiles(String jobID) {
+ List<ArchiveEvent> deleteCachedArchives(String archiveId, Path refreshDir)
{
+ LOG.info("Archive directories for job {} is deleted", archiveId);
+ return Collections.singletonList(deleteJobFiles(archiveId));
+ }
+
+ ArchiveEvent deleteJobFiles(String jobId) {
// Make sure we do not include this job in the overview
try {
- Files.deleteIfExists(new File(webOverviewDir, jobID +
JSON_FILE_ENDING).toPath());
+ Files.deleteIfExists(new File(webOverviewDir, jobId +
JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from overview directory.", ioe);
}
// Clean up job files we may have created
- File jobDirectory = new File(webJobDir, jobID);
+ File jobDirectory = new File(webJobDir, jobId);
try {
FileUtils.deleteDirectory(jobDirectory);
} catch (IOException ioe) {
@@ -361,13 +373,15 @@ class HistoryServerArchiveFetcher {
}
try {
- Files.deleteIfExists(new File(webJobDir, jobID +
JSON_FILE_ENDING).toPath());
+ Files.deleteIfExists(new File(webJobDir, jobId +
JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from job directory.", ioe);
}
+
+ return new ArchiveEvent(jobId, ArchiveEventType.DELETED);
}
- private static String convertLegacyJobOverview(String legacyOverview)
throws IOException {
+ private String convertLegacyJobOverview(String legacyOverview) throws
IOException {
JsonNode root = mapper.readTree(legacyOverview);
JsonNode finishedJobs = root.get("finished");
JsonNode job = finishedJobs.get(0);
@@ -436,6 +450,10 @@ class HistoryServerArchiveFetcher {
return sw.toString();
}
+ void updateOverview() {
+ updateJobOverview();
+ }
+
/**
* This method replicates the JSON response that would be given by the
JobsOverviewHandler when
* listing both running and finished jobs.
@@ -445,7 +463,7 @@ class HistoryServerArchiveFetcher {
*
* <p>For the display in the HistoryServer WebFrontend we have to combine
these overviews.
*/
- private static void updateJobOverview(File webOverviewDir, File webDir) {
+ void updateJobOverview() {
try (JsonGenerator gen =
jacksonFactory.createGenerator(
HistoryServer.createOrGetFile(webDir,
JobsOverviewHeaders.URL))) {
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
similarity index 96%
rename from
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
rename to
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
index 2ef991698bf..f2e50dd73c5 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.history.retaining;
import org.apache.flink.core.fs.FileStatus;
/** To define the strategy interface to judge whether the file should be
retained. */
-public interface JobRetainedStrategy {
+public interface ArchiveRetainedStrategy {
/**
* Judge whether the file should be retained.
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
similarity index 63%
rename from
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
rename to
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
index acd35c93f79..2a38dfaeff7 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
@@ -31,23 +31,42 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.apache.flink.util.Preconditions.checkArgument;
/** The retained strategy. */
-public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+public class CompositeArchiveRetainedStrategy implements
ArchiveRetainedStrategy {
- public static JobRetainedStrategy createFrom(ReadableConfig config) {
+ public static ArchiveRetainedStrategy
createForJobFromConfig(ReadableConfig config) {
int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+ if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1",
HISTORY_SERVER_RETAINED_JOBS.key());
+ }
Optional<Duration> retainedTtlOpt =
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
- return new CompositeJobRetainedStrategy(
- new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
- new
TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey),
+ new
TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
}
- private final List<JobRetainedStrategy> strategies;
+ public static ArchiveRetainedStrategy
createForApplicationFromConfig(ReadableConfig config) {
+ int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS);
+ if (maxHistorySize == 0 || maxHistorySize < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1",
+ HISTORY_SERVER_RETAINED_APPLICATIONS.key());
+ }
+ Optional<Duration> retainedTtlOpt =
config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySize),
+ new
TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
+ }
+
+ private final List<ArchiveRetainedStrategy> strategies;
- CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+ CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) {
this.strategies =
strategies == null || strategies.length == 0
? Collections.emptyList()
@@ -64,11 +83,11 @@ public class CompositeJobRetainedStrategy implements
JobRetainedStrategy {
}
/** The time to live based retained strategy. */
-class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy {
@Nullable private final Duration ttlThreshold;
- TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+ TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) {
if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than 0 milliseconds",
@@ -86,16 +105,13 @@ class TimeToLiveJobRetainedStrategy implements
JobRetainedStrategy {
}
}
-/** The job quantity based retained strategy. */
-class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+/** The quantity based retained strategy. */
+class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy {
private final int quantityThreshold;
- QuantityJobRetainedStrategy(int quantityThreshold) {
- if (quantityThreshold == 0 || quantityThreshold < -1) {
- throw new IllegalConfigurationException(
- "Cannot set %s to 0 or less than -1",
HISTORY_SERVER_RETAINED_JOBS.key());
- }
+ QuantityArchiveRetainedStrategy(int quantityThreshold) {
+ checkArgument(quantityThreshold == -1 || quantityThreshold > 0);
this.quantityThreshold = quantityThreshold;
}
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 537d58fae8e..c67a3147628 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,19 +18,32 @@
package org.apache.flink.runtime.webmonitor.history;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.history.ArchivePathUtils;
import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import
org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +64,8 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
@@ -58,11 +73,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -86,13 +104,14 @@ class HistoryServerTest {
private MiniClusterWithClientResource cluster;
private File jmDirectory;
private File hsDirectory;
+ private Configuration clusterConfig;
@BeforeEach
void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws
Exception {
this.jmDirectory = jmDirectory;
this.hsDirectory = hsDirectory;
- Configuration clusterConfig = new Configuration();
+ clusterConfig = new Configuration();
clusterConfig.set(JobManagerOptions.ARCHIVE_DIR,
jmDirectory.toURI().toString());
cluster =
@@ -130,6 +149,9 @@ class HistoryServerTest {
==
HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
numExpectedArchivedJobs.countDown();
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -164,10 +186,10 @@ class HistoryServerTest {
final int numArchivesToRemoveUponHsStart =
numArchivesBeforeHsStarted - numArchivesToKeepInHistory;
final long oneMinuteSinceEpoch = 1000L * 60L;
- List<String> expectedJobIdsToKeep = new LinkedList<>();
+ List<JobID> expectedJobIdsToKeep = new LinkedList<>();
for (int j = 0; j < numArchivesBeforeHsStarted; j++) {
- String jobId =
+ JobID jobId =
createLegacyArchive(
jmDirectory.toPath(), j * oneMinuteSinceEpoch,
versionLessThan14);
if (j >= numArchivesToRemoveUponHsStart) {
@@ -205,6 +227,9 @@ class HistoryServerTest {
numArchivesDeletedTotal.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -232,10 +257,9 @@ class HistoryServerTest {
}
}
- private Set<String> getIdsFromJobOverview(String baseUrl) throws Exception
{
+ private Set<JobID> getIdsFromJobOverview(String baseUrl) throws Exception {
return getJobsOverview(baseUrl).getJobs().stream()
.map(JobDetails::getJobId)
- .map(JobID::toString)
.collect(Collectors.toSet());
}
@@ -288,15 +312,26 @@ class HistoryServerTest {
new File(hsDirectory.toURI() +
"/overviews/dirtyEmptySubFile.json").createNewFile();
new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir();
new File(hsDirectory.toURI() +
"/jobs/dirtyEmptySubFile.json").createNewFile();
+ new File(hsDirectory.toURI() +
"/application-overviews/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() +
"/application-overviews/dirtyEmptySubFile.json")
+ .createNewFile();
+ new File(hsDirectory.toURI() +
"/applications/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() +
"/applications/dirtyEmptySubFile.json").createNewFile();
hs = new HistoryServer(historyServerConfig);
assertInitializedHistoryServerWebDir(hs.getWebDir());
}
private void assertInitializedHistoryServerWebDir(File historyWebDir) {
-
-
assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs");
+ assertThat(historyWebDir.list())
+ .containsExactlyInAnyOrder(
+ "overviews", "jobs", "application-overviews",
"applications");
assertThat(new File(historyWebDir,
"overviews")).exists().isDirectory().isEmptyDirectory();
assertThat(new File(historyWebDir,
"jobs").list()).containsExactly("overview.json");
+ assertThat(new File(historyWebDir, "application-overviews"))
+ .exists()
+ .isDirectory()
+ .isEmptyDirectory();
+ assertThat(new File(historyWebDir,
"applications").list()).containsExactly("overview.json");
}
private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws
Exception {
@@ -327,6 +362,9 @@ class HistoryServerTest {
allArchivesExpiredLatch.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not call");
});
try {
@@ -378,27 +416,33 @@ class HistoryServerTest {
assertThat(allArchivesExpiredLatch.await(10L,
TimeUnit.SECONDS)).isTrue();
- assertJobFilesCleanedUp(cleanupExpiredJobs);
+ assertFilesCleanedUp(cleanupExpiredJobs);
} finally {
hs.stop();
}
}
- private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted)
throws IOException {
+ private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws
IOException {
try (Stream<Path> paths = Files.walk(hsDirectory.toPath())) {
- final List<Path> jobFiles =
+ final List<Path> applicationOrJobFiles =
paths.filter(path -> !path.equals(hsDirectory.toPath()))
.map(path -> hsDirectory.toPath().relativize(path))
.filter(path ->
!path.equals(Paths.get("config.json")))
.filter(path -> !path.equals(Paths.get("jobs")))
.filter(path -> !path.equals(Paths.get("jobs",
"overview.json")))
.filter(path ->
!path.equals(Paths.get("overviews")))
+ .filter(path ->
!path.equals(Paths.get("applications")))
+ .filter(
+ path ->
+ !path.equals(
+ Paths.get("applications",
"overview.json")))
+ .filter(path ->
!path.equals(Paths.get("application-overviews")))
.collect(Collectors.toList());
- if (jobFilesShouldBeDeleted) {
- assertThat(jobFiles).isEmpty();
+ if (filesShouldBeDeleted) {
+ assertThat(applicationOrJobFiles).isEmpty();
} else {
- assertThat(jobFiles).isNotEmpty();
+ assertThat(applicationOrJobFiles).isNotEmpty();
}
}
}
@@ -413,6 +457,13 @@ class HistoryServerTest {
}
private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
+ return createTestConfiguration(
+ cleanupExpiredJobs,
+
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue());
+ }
+
+ private Configuration createTestConfiguration(
+ boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) {
Configuration historyServerConfig = new Configuration();
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS,
jmDirectory.toURI().toString());
@@ -424,6 +475,9 @@ class HistoryServerTest {
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS,
cleanupExpiredJobs);
+ historyServerConfig.set(
+
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS,
+ cleanupExpiredApplications);
historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT,
0);
return historyServerConfig;
@@ -451,15 +505,15 @@ class HistoryServerTest {
env.execute();
}
- private static String createLegacyArchive(
+ private static JobID createLegacyArchive(
Path directory, long fileModifiedDate, boolean versionLessThan14)
throws IOException {
- String jobId = createLegacyArchive(directory, versionLessThan14);
- File jobArchive = directory.resolve(jobId).toFile();
+ JobID jobId = createLegacyArchive(directory, versionLessThan14);
+ File jobArchive = directory.resolve(jobId.toString()).toFile();
jobArchive.setLastModified(fileModifiedDate);
return jobId;
}
- private static String createLegacyArchive(Path directory, boolean
versionLessThan14)
+ private static JobID createLegacyArchive(Path directory, boolean
versionLessThan14)
throws IOException {
JobID jobId = JobID.generate();
@@ -504,7 +558,405 @@ class HistoryServerTest {
directory.toAbsolutePath().toString(),
jobId.toString()),
Collections.singleton(archivedJson));
- return jobId.toString();
+ return jobId;
+ }
+
+ @Test
+ void testApplicationAndJobArchives() throws Exception {
+ int numApplications = 2;
+ int numJobsPerApplication = 2;
+ // jobs that are not part of an application
+ int numJobsOutsideApplication = 1;
+
+ Map<ApplicationID, Set<JobID>> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication =
mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId =
archivedApplication.getApplicationId();
+ List<JobID> jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new
HashSet<>(jobIds));
+ }
+ Set<JobID> expectedJobIdsOutsideApplication = new
HashSet<>(numJobsOutsideApplication);
+ for (int i = 0; i < numJobsOutsideApplication; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+ mockJobArchive(executionGraphInfo, null);
+
expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId());
+ }
+
+ int numTotalJobs = numApplications * numJobsPerApplication +
numJobsOutsideApplication;
+ int numTotal = numApplications + numTotalJobs;
+ CountDownLatch numExpectedArchives = new CountDownLatch(numTotal);
+ Configuration historyServerConfig = createTestConfiguration(false);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ if (event.getType()
+ ==
HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
+ numExpectedArchives.countDown();
+ }
+ },
+ (event) -> {
+ if (event.getType()
+ ==
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ Set<JobID> expectedJobIds =
+ Stream.concat(
+
expectedApplicationAndJobIds.values().stream()
+ .flatMap(Set::stream),
+ expectedJobIdsOutsideApplication.stream())
+ .collect(Collectors.toSet());
+
assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds);
+ // checks whether the dashboard configuration contains all
expected fields
+ getDashboardConfiguration(baseUrl);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws
Exception {
+ int numJobsPerApplication = 1;
+ int numApplicationsToKeepInHistory = 2;
+ int numApplicationsBeforeHsStarted = 4;
+ int numApplicationsAfterHsStarted = 2;
+ int numApplicationsToRemoveUponHsStart =
+ numApplicationsBeforeHsStarted -
numApplicationsToKeepInHistory;
+ List<Tuple2<ApplicationID, Set<JobID>>>
expectedApplicationAndJobIdsToKeep =
+ new LinkedList<>();
+ for (int i = 0; i < numApplicationsBeforeHsStarted; i++) {
+ ArchivedApplication archivedApplication =
mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId =
archivedApplication.getApplicationId();
+ List<JobID> jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ if (i >= numApplicationsToRemoveUponHsStart) {
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ }
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numArchivesCreatedInitially =
+ new CountDownLatch(numApplicationsToKeepInHistory *
numArchivesRatio);
+ // jobs in applications that exceed the size limit are not read by the
fetcher at all,
+ // so there is no need to delete these jobs.
+ CountDownLatch numArchivesDeletedInitially =
+ new CountDownLatch(numApplicationsToRemoveUponHsStart);
+ CountDownLatch numArchivesCreatedTotal =
+ new CountDownLatch(
+ (numApplicationsBeforeHsStarted
+ - numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted)
+ * numArchivesRatio);
+ CountDownLatch numArchivesDeletedTotal =
+ new CountDownLatch(
+ numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted *
numArchivesRatio);
+ Configuration historyServerConfig =
+ createTestConfiguration(
+
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS,
+ numApplicationsToKeepInHistory);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not call");
+ },
+ (event) -> {
+ if (event.getType()
+ ==
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numArchivesCreatedInitially.countDown();
+ numArchivesCreatedTotal.countDown();
+ } else if (event.getType()
+ ==
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ numArchivesDeletedInitially.countDown();
+ numArchivesDeletedTotal.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numArchivesCreatedInitially.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedInitially.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple
-> tuple.f1)));
+ for (int i = numApplicationsBeforeHsStarted;
+ i < numApplicationsBeforeHsStarted +
numApplicationsAfterHsStarted;
+ i++) {
+ expectedApplicationAndJobIdsToKeep.remove(0);
+ ArchivedApplication archivedApplication =
+ mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId =
archivedApplication.getApplicationId();
+ List<JobID> jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ // avoid executing too fast, resulting in the same creation
time of archive files
+ Thread.sleep(50);
+ }
+
+ assertThat(numArchivesCreatedTotal.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedTotal.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple
-> tuple.f1)));
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsZero() {
+ assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() {
+ assertThatThrownBy(() ->
startHistoryServerWithApplicationSizeLimit(-2))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize)
+ throws IOException, FlinkException, InterruptedException {
+ Configuration historyServerConfig =
+ createTestConfiguration(
+
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS
+ .defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS,
maxHistorySize);
+ new HistoryServer(historyServerConfig).start();
+ }
+
+ @Test
+ void testCleanExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(true);
+ }
+
+ @Test
+ void testRemainExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(false);
+ }
+
+ private void runApplicationArchiveExpirationTest(boolean
cleanupExpiredApplications)
+ throws Exception {
+ int numExpiredApplications = cleanupExpiredApplications ? 1 : 0;
+ int numApplications = 3;
+ int numJobsPerApplication = 1;
+
+ Map<ApplicationID, Set<JobID>> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication =
mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId =
archivedApplication.getApplicationId();
+ List<JobID> jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new
HashSet<>(jobIds));
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numExpectedArchives = new
CountDownLatch(numApplications * numArchivesRatio);
+ CountDownLatch firstArchiveExpiredLatch =
+ new CountDownLatch(numExpiredApplications * numArchivesRatio);
+ CountDownLatch allArchivesExpiredLatch =
+ new CountDownLatch(
+ cleanupExpiredApplications ? numApplications *
numArchivesRatio : 0);
+
+ Configuration historyServerConfig =
+ createTestConfiguration(
+
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(),
+ cleanupExpiredApplications);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not call");
+ },
+ (event) -> {
+ if (event.getType()
+ ==
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ } else if (event.getType()
+ ==
HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ firstArchiveExpiredLatch.countDown();
+ allArchivesExpiredLatch.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ ApplicationID applicationIdToDelete =
+ expectedApplicationAndJobIds.keySet().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected at least one
application"));
+ if (cleanupExpiredApplications) {
+ expectedApplicationAndJobIds.remove(applicationIdToDelete);
+ }
+ // trigger another fetch and delete one archive from jm
+ // we fetch again to probabilistically cause a concurrent deletion
+ hs.fetchArchives();
+ deleteApplicationArchiveDir(applicationIdToDelete);
+
+ assertThat(firstArchiveExpiredLatch.await(10L,
TimeUnit.SECONDS)).isTrue();
+ // check that archive is still/no longer present in hs
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ for (ApplicationID remainingApplicationId :
expectedApplicationAndJobIds.keySet()) {
+ deleteApplicationArchiveDir(remainingApplicationId);
+ }
+ assertThat(allArchivesExpiredLatch.await(10L,
TimeUnit.SECONDS)).isTrue();
+ assertFilesCleanedUp(cleanupExpiredApplications);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ private Map<ApplicationID, Set<JobID>>
getApplicationAndJobIdsFromApplicationOverview(
+ String baseUrl) throws Exception {
+ Set<ApplicationID> applicationIds =
+ getApplicationsOverview(baseUrl).getApplications().stream()
+ .map(ApplicationDetails::getApplicationId)
+ .collect(Collectors.toSet());
+ Map<ApplicationID, Set<JobID>> applicationAndJobIds = new
HashMap<>(applicationIds.size());
+ for (ApplicationID applicationId : applicationIds) {
+ Set<JobID> jobIds =
+ getApplicationDetails(baseUrl,
applicationId).getJobs().stream()
+ .map(JobDetails::getJobId)
+ .collect(Collectors.toSet());
+ applicationAndJobIds.put(applicationId, jobIds);
+ }
+ return applicationAndJobIds;
+ }
+
+ private static MultipleApplicationsDetails getApplicationsOverview(String
baseUrl)
+ throws Exception {
+ Tuple2<Integer, String> response =
+ HttpUtils.getFromHTTP(baseUrl +
ApplicationsOverviewHeaders.URL);
+ return OBJECT_MAPPER.readValue(response.f1,
MultipleApplicationsDetails.class);
+ }
+
+ private static ApplicationDetailsInfo getApplicationDetails(
+ String baseUrl, ApplicationID applicationId) throws Exception {
+ Tuple2<Integer, String> response =
+ HttpUtils.getFromHTTP(
+ baseUrl
+ + ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY,
+ applicationId.toString()));
+ return OBJECT_MAPPER.readValue(response.f1,
ApplicationDetailsInfo.class);
+ }
+
+ private ArchivedApplication mockApplicationArchive(int numJobs) throws
IOException {
+ ArchivedApplication archivedApplication =
createArchivedApplication(numJobs);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ ArchivedJson archivedApplicationsOverview =
+ new ArchivedJson(
+ ApplicationsOverviewHeaders.URL,
+ new MultipleApplicationsDetails(
+ Collections.singleton(
+
ApplicationDetails.fromArchivedApplication(
+ archivedApplication))));
+ ArchivedJson archivedApplicationDetails =
+ new ArchivedJson(
+ ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY,
applicationId.toString()),
+
ApplicationDetailsInfo.fromArchivedApplication(archivedApplication));
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig,
applicationId),
+ Arrays.asList(archivedApplicationsOverview,
archivedApplicationDetails));
+
+ Map<JobID, ExecutionGraphInfo> jobs = archivedApplication.getJobs();
+ for (Map.Entry<JobID, ExecutionGraphInfo> jobEntry : jobs.entrySet()) {
+ mockJobArchive(jobEntry.getValue(), applicationId);
+ }
+ return archivedApplication;
+ }
+
+ private void mockJobArchive(
+ ExecutionGraphInfo executionGraphInfo, @Nullable ApplicationID
applicationId)
+ throws IOException {
+ JobID jobId = executionGraphInfo.getJobId();
+ ArchivedJson archivedJobsOverview =
+ new ArchivedJson(
+ JobsOverviewHeaders.URL,
+ new MultipleJobsDetails(
+ Collections.singleton(
+ JobDetails.createDetailsForJob(
+
executionGraphInfo.getArchivedExecutionGraph()))));
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getJobArchivePath(clusterConfig, jobId,
applicationId),
+ Collections.singletonList(archivedJobsOverview));
+ }
+
+ private ArchivedApplication createArchivedApplication(int numJobs) {
+ ApplicationID applicationId = ApplicationID.generate();
+ Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>(numJobs);
+ for (int i = 0; i < numJobs; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo();
+ jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
+ }
+ return new ArchivedApplication(
+ applicationId,
+ "test-application",
+ ApplicationState.FINISHED,
+ new long[ApplicationState.values().length],
+ jobs);
+ }
+
+ private ExecutionGraphInfo createExecutionGraphInfo() {
+ return new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+ JobID.generate(), "test-job", JobStatus.FINISHED,
null, null, null, 0));
+ }
+
+ private void deleteApplicationArchiveDir(ApplicationID applicationId)
throws IOException {
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ org.apache.flink.core.fs.Path applicationArchiveDir =
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig,
applicationId)
+ .getParent();
+ applicationArchiveDir.getFileSystem().delete(applicationArchiveDir,
true);
}
private static final class JsonObject implements AutoCloseable {
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
similarity index 64%
rename from
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
rename to
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
index e8983967df5..d1d6124b357 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
@@ -18,31 +18,49 @@
package org.apache.flink.runtime.webmonitor.history.retaining;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static
org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Testing for {@link CompositeJobRetainedStrategy}. */
-class CompositeJobRetainedStrategyTest {
+/** Testing for {@link CompositeArchiveRetainedStrategy}. */
+class CompositeArchiveRetainedStrategyTest {
+
+ private static Stream<TestCase> getTestCases() {
+ return Stream.of(
+ new TestCase(
+ "Legacy Jobs",
+ HISTORY_SERVER_RETAINED_JOBS,
+
CompositeArchiveRetainedStrategy::createForJobFromConfig),
+ new TestCase(
+ "Applications",
+ HISTORY_SERVER_RETAINED_APPLICATIONS,
+
CompositeArchiveRetainedStrategy::createForApplicationFromConfig));
+ }
- @Test
- void testTimeToLiveBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
// Skipped for option value that is less than 0 milliseconds, which
will throw a
// java.lang.NumberFormatException caused by TimeUtils.
@@ -51,7 +69,7 @@ class CompositeJobRetainedStrategyTest {
// Test the case where no specific retention policy is configured,
i.e., all archived files
// are retained.
- JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -63,7 +81,7 @@ class CompositeJobRetainedStrategyTest {
// Test the case where TTL-based retention policies is specified only.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -74,35 +92,37 @@ class CompositeJobRetainedStrategyTest {
.isFalse();
}
- @Test
- void testQuantityBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), 0);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), -2);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+ conf.removeConfig(testCase.getQuantityConfigOption());
// Test the case where no specific retention policy is configured,
i.e., all archived files
// are retained.
- JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
// Test the case where QUANTITY-based retention policies is specified
only.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(),
3)).isFalse();
}
- @Test
- void testCompositeBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) {
final long outOfTtlMillis =
Instant.now().toEpochMilli() -
Duration.ofMinutes(2L).toMillis();
@@ -110,7 +130,7 @@ class CompositeJobRetainedStrategyTest {
// Test the case where no specific retention policy is configured,
i.e., all archived files
// are retained.
final Configuration conf = new Configuration();
- JobRetainedStrategy strategy =
CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(),
10)).isTrue();
assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 3)).isTrue();
@@ -118,8 +138,8 @@ class CompositeJobRetainedStrategyTest {
// Test the case where both retention policies are specified.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 1)).isFalse();
assertThat(strategy.shouldRetain(new TestingFileStatus(),
10)).isFalse();
assertThat(strategy.shouldRetain(new
TestingFileStatus(outOfTtlMillis), 3)).isFalse();
@@ -173,4 +193,32 @@ class CompositeJobRetainedStrategyTest {
return null;
}
}
+
+ private static final class TestCase {
+ private final String testName;
+ private final ConfigOption<Integer> quantityConfigOption;
+ private final Function<Configuration, ArchiveRetainedStrategy>
strategyFunction;
+
+ TestCase(
+ String testName,
+ ConfigOption<Integer> quantityConfigOption,
+ Function<Configuration, ArchiveRetainedStrategy>
strategyFunction) {
+ this.testName = testName;
+ this.quantityConfigOption = quantityConfigOption;
+ this.strategyFunction = strategyFunction;
+ }
+
+ ArchiveRetainedStrategy createStrategy(Configuration conf) {
+ return strategyFunction.apply(conf);
+ }
+
+ ConfigOption<Integer> getQuantityConfigOption() {
+ return quantityConfigOption;
+ }
+
+ @Override
+ public String toString() {
+ return testName;
+ }
+ }
}