[flink] branch release-1.8 updated: [FLINK-13892][hs] Harden HistoryServerTest

2019-09-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new d540794  [FLINK-13892][hs] Harden HistoryServerTest
d540794 is described below

commit d5407947f23f7a6a9e8a8dbc7d2e78ea6257b7a8
Author: Chesnay Schepler 
AuthorDate: Thu Aug 29 13:15:08 2019 +0200

[FLINK-13892][hs] Harden HistoryServerTest
---
 .../flink/runtime/webmonitor/history/HistoryServer.java  |  6 +++---
 .../webmonitor/history/HistoryServerArchiveFetcher.java  | 12 ++--
 .../flink/runtime/webmonitor/history/HistoryServerTest.java  |  7 ---
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index a93fe93..f1a5330 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -140,9 +140,9 @@ public class HistoryServer {
this(config, new CountDownLatch(0));
}
 
-   public HistoryServer(Configuration config, CountDownLatch 
numFinishedPolls) throws IOException, FlinkException {
+   public HistoryServer(Configuration config, CountDownLatch 
numArchivedJobs) throws IOException, FlinkException {
Preconditions.checkNotNull(config);
-   Preconditions.checkNotNull(numFinishedPolls);
+   Preconditions.checkNotNull(numArchivedJobs);
 
this.config = config;
if 
(config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && 
SSLUtils.isRestSSLEnabled(config)) {
@@ -187,7 +187,7 @@ public class HistoryServer {
}
 
long refreshIntervalMillis = 
config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
-   archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, 
numFinishedPolls);
+   archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, 
numArchivedJobs);
 
this.shutdownHook = ShutdownHookUtil.addShutdownHook(
HistoryServer.this::stop,
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index f95b14c..47888cd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -79,9 +79,9 @@ class HistoryServerArchiveFetcher {
private final JobArchiveFetcherTask fetcherTask;
private final long refreshIntervalMillis;
 
-   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir, CountDownLatch 
numFinishedPolls) {
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir, CountDownLatch 
numArchivedJobs) {
this.refreshIntervalMillis = refreshIntervalMillis;
-   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir, numFinishedPolls);
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir, numArchivedJobs);
if (LOG.isInfoEnabled()) {
for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
@@ -112,7 +112,7 @@ class HistoryServerArchiveFetcher {
static class JobArchiveFetcherTask extends TimerTask {
 
private final List refreshDirs;
-   private final CountDownLatch numFinishedPolls;
+   private final CountDownLatch numArchivedJobs;
 
/** Cache of all available jobs identified by their id. */
private final Set cachedArchives;
@@ -123,9 +123,9 @@ class HistoryServerArchiveFetcher {
 
private static final String JSON_FILE_ENDING = ".json";
 
-   JobArchiveFetcherTask(List 
refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+   JobArchiveFetcherTask(List 
refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
this.refreshDirs = checkNotNull(refreshDirs);
-   this.numFinishedPolls = numFinishedPolls;
+   this.numArchivedJobs = numArchivedJobs;
this.cachedArchives = new HashSet<>();

[flink] branch release-1.8 updated: [FLINK-13892][hs] Harden HistoryServerTest

2019-09-06 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 750648d  [FLINK-13892][hs] Harden HistoryServerTest
750648d is described below

commit 750648d4fb0dd83edf7e0a8462cc52118a98e484
Author: Chesnay Schepler 
AuthorDate: Thu Sep 5 13:40:06 2019 +0200

[FLINK-13892][hs] Harden HistoryServerTest
---
 .../runtime/webmonitor/history/HistoryServerArchiveFetcher.java | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 47888cd..fed220f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -153,6 +153,7 @@ class HistoryServerArchiveFetcher {
continue;
}
boolean updateOverview = false;
+   int numFetchedArchives = 0;
for (FileStatus jobArchive : 
jobArchives) {
Path jobArchivePath = 
jobArchive.getPath();
String jobID = 
jobArchivePath.getName();
@@ -200,7 +201,7 @@ class HistoryServerArchiveFetcher {
}
}
updateOverview 
= true;
-   
numArchivedJobs.countDown();
+   
numFetchedArchives++;
} catch (IOException e) 
{

LOG.error("Failure while fetching/processing job archive for job {}.", jobID, 
e);
// Make sure we 
attempt to fetch the archive again
@@ -224,6 +225,9 @@ class HistoryServerArchiveFetcher {
}
if (updateOverview) {

updateJobOverview(webOverviewDir, webDir);
+   for (int x = 0; x < 
numFetchedArchives; x++) {
+   
numArchivedJobs.countDown();
+   }
}
}
} catch (Exception e) {