This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 2f26f89  [FLINK-13892][hs] Harden HistoryServerTest
2f26f89 is described below

commit 2f26f894be9905efa5cc90e28479ef8d96a4fc8d
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Aug 29 13:15:08 2019 +0200

    [FLINK-13892][hs] Harden HistoryServerTest
---
 .../flink/runtime/webmonitor/history/HistoryServer.java      |  6 +++---
 .../webmonitor/history/HistoryServerArchiveFetcher.java      | 12 ++++++------
 .../flink/runtime/webmonitor/history/HistoryServerTest.java  |  7 ++++---
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index e907836..37407bb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -137,9 +137,9 @@ public class HistoryServer {
                this(config, new CountDownLatch(0));
        }
 
-       public HistoryServer(Configuration config, CountDownLatch 
numFinishedPolls) throws IOException, FlinkException {
+       public HistoryServer(Configuration config, CountDownLatch 
numArchivedJobs) throws IOException, FlinkException {
                Preconditions.checkNotNull(config);
-               Preconditions.checkNotNull(numFinishedPolls);
+               Preconditions.checkNotNull(numArchivedJobs);
 
                this.config = config;
                if (HistoryServerUtils.isSSLEnabled(config)) {
@@ -184,7 +184,7 @@ public class HistoryServer {
                }
 
                long refreshIntervalMillis = 
config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
-               archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, 
numFinishedPolls);
+               archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, 
numArchivedJobs);
 
                this.shutdownHook = ShutdownHookUtil.addShutdownHook(
                        HistoryServer.this::stop,
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index f95b14c..47888cd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -79,9 +79,9 @@ class HistoryServerArchiveFetcher {
        private final JobArchiveFetcherTask fetcherTask;
        private final long refreshIntervalMillis;
 
-       HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch 
numFinishedPolls) {
+       HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch 
numArchivedJobs) {
                this.refreshIntervalMillis = refreshIntervalMillis;
-               this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir, numFinishedPolls);
+               this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir, numArchivedJobs);
                if (LOG.isInfoEnabled()) {
                        for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
                                LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
@@ -112,7 +112,7 @@ class HistoryServerArchiveFetcher {
        static class JobArchiveFetcherTask extends TimerTask {
 
                private final List<HistoryServer.RefreshLocation> refreshDirs;
-               private final CountDownLatch numFinishedPolls;
+               private final CountDownLatch numArchivedJobs;
 
                /** Cache of all available jobs identified by their id. */
                private final Set<String> cachedArchives;
@@ -123,9 +123,9 @@ class HistoryServerArchiveFetcher {
 
                private static final String JSON_FILE_ENDING = ".json";
 
-               JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> 
refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+               JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> 
refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
                        this.refreshDirs = checkNotNull(refreshDirs);
-                       this.numFinishedPolls = numFinishedPolls;
+                       this.numArchivedJobs = numArchivedJobs;
                        this.cachedArchives = new HashSet<>();
                        this.webDir = checkNotNull(webDir);
                        this.webJobDir = new File(webDir, "jobs");
@@ -200,6 +200,7 @@ class HistoryServerArchiveFetcher {
                                                                        }
                                                                }
                                                                updateOverview 
= true;
+                                                               
numArchivedJobs.countDown();
                                                        } catch (IOException e) 
{
                                                                
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, 
e);
                                                                // Make sure we 
attempt to fetch the archive again
@@ -228,7 +229,6 @@ class HistoryServerArchiveFetcher {
                        } catch (Exception e) {
                                LOG.error("Critical failure while 
fetching/processing job archives.", e);
                        }
-                       numFinishedPolls.countDown();
                }
        }
 
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a1ace56..fa932dc 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -116,11 +116,12 @@ public class HistoryServerTest extends TestLogger {
                }
                createLegacyArchive(jmDirectory.toPath());
 
-               CountDownLatch numFinishedPolls = new CountDownLatch(1);
+               CountDownLatch numExpectedArchivedJobs = new 
CountDownLatch(numJobs + 1);
 
                Configuration historyServerConfig = new Configuration();
                
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, 
jmDirectory.toURI().toString());
                
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, 
hsDirectory.getAbsolutePath());
+               
historyServerConfig.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL,
 100L);
 
                
historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
@@ -131,11 +132,11 @@ public class HistoryServerTest extends TestLogger {
                        archives = jmDirectory.listFiles();
                }
 
-               HistoryServer hs = new HistoryServer(historyServerConfig, 
numFinishedPolls);
+               HistoryServer hs = new HistoryServer(historyServerConfig, 
numExpectedArchivedJobs);
                try {
                        hs.start();
                        String baseUrl = "http://localhost:"; + hs.getWebPort();
-                       numFinishedPolls.await(10L, TimeUnit.SECONDS);
+                       numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
 
                        ObjectMapper mapper = new ObjectMapper();
                        String response = getFromHTTP(baseUrl + 
JobsOverviewHeaders.URL);

Reply via email to