[FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to Dispatcher

The ArchivedExecutionGraphStore is responsible for storing completed jobs
for historic job requests (e.g. from the web ui or from the client). The store
is populated by the Dispatcher once a job has terminated.

The FileArchivedExecutionGraphStore implementation persists all
ArchivedExecutionGraphs on disk in order to avoid OOM problems. It only keeps
some of the stored graphs in memory until it reaches a configurable size. Once
coming close to this size, it will evict the elements and only reload them if
requested again. Additionally, the FileArchivedExecutionGraphStore defines
an expiration time after which the execution graphs will be removed from disk.
This prevents excessive use of disk resources.

This closes #5310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b817f0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b817f0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b817f0f

Branch: refs/heads/master
Commit: 8b817f0f9f0ec55f040b56f2d65c62761eac1ac1
Parents: 60b7b03
Author: Till Rohrmann <[email protected]>
Authored: Fri Jan 19 14:20:03 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:50:11 2018 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |   4 +
 .../flink/configuration/JobManagerOptions.java  |  17 +
 .../apache/flink/runtime/blob/BlobUtils.java    |   2 +-
 .../dispatcher/ArchivedExecutionGraphStore.java |  74 ++++
 .../flink/runtime/dispatcher/Dispatcher.java    |  56 ++-
 .../FileArchivedExecutionGraphStore.java        | 301 +++++++++++++++++
 .../dispatcher/StandaloneDispatcher.java        |   2 +
 .../entrypoint/SessionClusterEntrypoint.java    |  40 +++
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +-
 .../messages/webmonitor/ClusterOverview.java    |  51 +--
 .../messages/webmonitor/JobsOverview.java       |  49 ++-
 .../ManuallyTriggeredScheduledExecutor.java     | 163 +++++++++
 .../runtime/dispatcher/DispatcherTest.java      |   3 +
 .../FileArchivedExecutionGraphStoreTest.java    | 337 +++++++++++++++++++
 .../MemoryArchivedExecutionGraphStore.java      |  80 +++++
 15 files changed, 1130 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 49e0a35..8a4b1c1 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -242,6 +242,10 @@ Default value is `1 minute`.
 - `restart-strategy.failure-rate.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "failure-rate".
 Default value is the `akka.ask.timeout`.
 
+- `jobstore.cache-size`: The job store cache size in bytes which is used to 
keep completed jobs in memory (DEFAULT: `52428800` (`50` MB)).
+
+- `jobstore.expiration-time`: The time in seconds after which a completed job 
expires and is purged from the job store (DEFAULT: `3600`).
+
 ## Full Reference
 
 ### HDFS

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1e22a24..9f61736 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -96,6 +96,23 @@ public class JobManagerOptions {
                key("jobmanager.archive.fs.dir")
                        .noDefaultValue();
 
+       /**
+        * The job store cache size in bytes which is used to keep completed
+        * jobs in memory.
+        */
+       public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
+               key("jobstore.cache-size")
+               .defaultValue(50L * 1024L * 1024L)
+               .withDescription("The job store cache size in bytes which is 
used to keep completed jobs in memory.");
+
+       /**
+        * The time in seconds after which a completed job expires and is 
purged from the job store.
+        */
+       public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
+               key("jobstore.expiration-time")
+               .defaultValue(60L * 60L)
+               .withDescription("The time in seconds after which a completed 
job expires and is purged from the job store.");
+
        // 
---------------------------------------------------------------------------------------------
 
        private JobManagerOptions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 04d3366..3273e1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -306,7 +306,7 @@ public class BlobUtils {
        /**
         * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
         */
-       static Thread addShutdownHook(final Closeable service, final Logger 
logger) {
+       public static Thread addShutdownHook(final Closeable service, final 
Logger logger) {
                checkNotNull(service);
                checkNotNull(logger);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
new file mode 100644
index 0000000..6f5df53
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for a {@link ArchivedExecutionGraph} store.
+ */
+public interface ArchivedExecutionGraphStore extends Closeable {
+
+       /**
+        * Returns the current number of stored {@link ArchivedExecutionGraph}.
+        *
+        * @return Current number of stored {@link ArchivedExecutionGraph}
+        */
+       int size();
+
+       /**
+        * Get the {@link ArchivedExecutionGraph} for the given job id. Null if 
it isn't stored.
+        *
+        * @param jobId identifying the serializable execution graph to retrieve
+        * @return The stored serializable execution graph or null
+        */
+       @Nullable
+       ArchivedExecutionGraph get(JobID jobId);
+
+       /**
+        * Store the given {@link ArchivedExecutionGraph} in the store.
+        *
+        * @param archivedExecutionGraph to store
+        * @throws IOException if the serializable execution graph could not be 
stored in the store
+        */
+       void put(ArchivedExecutionGraph archivedExecutionGraph) throws 
IOException;
+
+       /**
+        * Return the {@link JobsOverview} for all stored/past jobs.
+        *
+        * @return Jobs overview for all stored/past jobs
+        */
+       JobsOverview getStoredJobsOverview();
+
+       /**
+        * Return the collection of {@link JobDetails} of all currently stored 
jobs.
+        *
+        * @return Collection of job details of all currently stored jobs
+        */
+       Collection<JobDetails> getAvailableJobDetails();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index a5c8961..e930450 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -47,6 +46,7 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -102,6 +102,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        private final JobExecutionResultCache jobExecutionResultCache = new 
JobExecutionResultCache();
 
+       private final ArchivedExecutionGraphStore archivedExecutionGraphStore;
+
        @Nullable
        protected final String restAddress;
 
@@ -114,6 +116,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress) throws Exception {
                super(rpcService, endpointId);
@@ -136,6 +139,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                leaderElectionService = 
highAvailabilityServices.getDispatcherLeaderElectionService();
 
                this.restAddress = restAddress;
+
+               this.archivedExecutionGraphStore = 
Preconditions.checkNotNull(archivedExecutionGraphStore);
        }
 
        //------------------------------------------------------
@@ -307,10 +312,15 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                CompletableFuture<Collection<JobStatus>> allJobsFuture = 
FutureUtils.combineAll(jobStatus);
 
+               final JobsOverview completedJobsOverview = 
archivedExecutionGraphStore.getStoredJobsOverview();
+
                return allJobsFuture.thenCombine(
                        taskManagerOverviewFuture,
-                       (Collection<JobStatus> allJobsStatus, ResourceOverview 
resourceOverview) ->
-                               ClusterOverview.create(resourceOverview, 
allJobsStatus));
+                       (Collection<JobStatus> runningJobsStatus, 
ResourceOverview resourceOverview) -> {
+                               final JobsOverview allJobsOverview = 
JobsOverview.create(runningJobsStatus).combine(completedJobsOverview);
+                               return new ClusterOverview(resourceOverview, 
allJobsOverview);
+                       });
+
        }
 
        @Override
@@ -325,9 +335,16 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                CompletableFuture<Collection<JobDetails>> combinedJobDetails = 
FutureUtils.combineAll(individualJobDetails);
 
+               final Collection<JobDetails> completedJobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails();
+
                return combinedJobDetails.thenApply(
-                       (Collection<JobDetails> jobDetails) ->
-                               new MultipleJobsDetails(jobDetails));
+                       (Collection<JobDetails> runningJobDetails) -> {
+                               final Collection<JobDetails> allJobDetails = 
new ArrayList<>(completedJobDetails.size() + runningJobDetails.size());
+                               allJobDetails.addAll(runningJobDetails);
+                               allJobDetails.addAll(completedJobDetails);
+                               return new MultipleJobsDetails(allJobDetails);
+                       });
+
        }
 
        @Override
@@ -335,7 +352,14 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
 
                if (jobManagerRunner == null) {
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+                       final ArchivedExecutionGraph serializableExecutionGraph 
= archivedExecutionGraphStore.get(jobId);
+
+                       // check whether it is a completed job
+                       if (serializableExecutionGraph == null) {
+                               return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+                       } else {
+                               return 
CompletableFuture.completedFuture(serializableExecutionGraph);
+                       }
                } else {
                        return 
jobManagerRunner.getJobManagerGateway().requestJob(jobId, timeout);
                }
@@ -486,11 +510,25 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                fatalErrorHandler.onFatalError(throwable);
        }
 
-       private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-               final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+       private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+               Preconditions.checkArgument(
+                       
archivedExecutionGraph.getState().isGloballyTerminalState(),
+                       "Job " + archivedExecutionGraph.getJobID() + " is in 
state " +
+                               archivedExecutionGraph.getState() + " which is 
not globally terminal.");
+               final JobResult jobResult = 
JobResult.createFrom(archivedExecutionGraph);
 
                jobExecutionResultCache.put(jobResult);
-               final JobID jobId = accessExecutionGraph.getJobID();
+               final JobID jobId = archivedExecutionGraph.getJobID();
+
+               try {
+                       archivedExecutionGraphStore.put(archivedExecutionGraph);
+               } catch (IOException e) {
+                       log.info(
+                               "Could not store completed job {}({}).",
+                               archivedExecutionGraph.getJobName(),
+                               archivedExecutionGraph.getJobID(),
+                               e);
+               }
 
                try {
                        removeJob(jobId, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
new file mode 100644
index 0000000..8db4fac
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+       private final File storageDir;
+
+       private final Cache<JobID, JobDetails> jobDetailsCache;
+
+       private final LoadingCache<JobID, ArchivedExecutionGraph> 
archivedExecutionGraphCache;
+
+       private final ScheduledFuture<?> cleanupFuture;
+
+       private final Thread shutdownHook;
+
+       private int numFinishedJobs;
+
+       private int numFailedJobs;
+
+       private int numCanceledJobs;
+
+       public FileArchivedExecutionGraphStore(
+                       File rootDir,
+                       Time expirationTime,
+                       long maximumCacheSizeBytes,
+                       ScheduledExecutor scheduledExecutor,
+                       Ticker ticker) throws IOException {
+
+               final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+               LOG.info(
+                       "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+                       FileArchivedExecutionGraphStore.class.getSimpleName(),
+                       storageDirectory,
+                       expirationTime.toMilliseconds(),
+                       maximumCacheSizeBytes);
+
+               this.storageDir = Preconditions.checkNotNull(storageDirectory);
+               Preconditions.checkArgument(
+                       storageDirectory.exists() && 
storageDirectory.isDirectory(),
+                       "The storage directory must exist and be a directory.");
+               this.jobDetailsCache = CacheBuilder.newBuilder()
+                       .expireAfterWrite(expirationTime.toMilliseconds(), 
TimeUnit.MILLISECONDS)
+                       .removalListener(
+                               (RemovalListener<JobID, JobDetails>) 
notification -> deleteExecutionGraphFile(notification.getKey()))
+                       .ticker(ticker)
+                       .build();
+
+               this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
+                       .maximumWeight(maximumCacheSizeBytes)
+                       .weigher(this::calculateSize)
+                       .build(new CacheLoader<JobID, ArchivedExecutionGraph>() 
{
+                               @Override
+                               public ArchivedExecutionGraph load(JobID jobId) 
throws Exception {
+                                       return loadExecutionGraph(jobId);
+                               }});
+
+               this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
+                       jobDetailsCache::cleanUp,
+                       expirationTime.toMilliseconds(),
+                       expirationTime.toMilliseconds(),
+                       TimeUnit.MILLISECONDS);
+
+               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+
+               this.numFinishedJobs = 0;
+               this.numFailedJobs = 0;
+               this.numCanceledJobs = 0;
+       }
+
+       @Override
+       public int size() {
+               return Math.toIntExact(jobDetailsCache.size());
+       }
+
+       @Override
+       @Nullable
+       public ArchivedExecutionGraph get(JobID jobId) {
+               try {
+                       return archivedExecutionGraphCache.get(jobId);
+               } catch (ExecutionException e) {
+                       LOG.debug("Could not load archived execution graph for 
job id {}.", jobId, e);
+                       return null;
+               }
+       }
+
+       @Override
+       public void put(ArchivedExecutionGraph archivedExecutionGraph) throws 
IOException {
+               final JobStatus jobStatus = archivedExecutionGraph.getState();
+               final JobID jobId = archivedExecutionGraph.getJobID();
+               final String jobName = archivedExecutionGraph.getJobName();
+
+               Preconditions.checkArgument(
+                       jobStatus.isGloballyTerminalState(),
+                       "The job " + jobName + '(' + jobId +
+                               ") is not in a globally terminal state. Instead 
it is in state " + jobStatus + '.');
+
+               switch (jobStatus) {
+                       case FINISHED:
+                               numFinishedJobs++;
+                               break;
+                       case CANCELED:
+                               numCanceledJobs++;
+                               break;
+                       case FAILED:
+                               numFailedJobs++;
+                               break;
+                       default:
+                               throw new IllegalStateException("The job " + 
jobName + '(' +
+                                       jobId + ") should have been in a 
globally terminal state. " +
+                                       "Instead it was in state " + jobStatus 
+ '.');
+               }
+
+               // write the ArchivedExecutionGraph to disk
+               storeArchivedExecutionGraph(archivedExecutionGraph);
+
+               final JobDetails detailsForJob = 
WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
+
+               jobDetailsCache.put(jobId, detailsForJob);
+               archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
+       }
+
+       @Override
+       public JobsOverview getStoredJobsOverview() {
+               return new JobsOverview(0, numFinishedJobs, numCanceledJobs, 
numFailedJobs);
+       }
+
+       @Override
+       public Collection<JobDetails> getAvailableJobDetails() {
+               return jobDetailsCache.asMap().values();
+       }
+
+       @Override
+       public void close() throws IOException {
+               cleanupFuture.cancel(false);
+
+               jobDetailsCache.invalidateAll();
+
+               // clean up the storage directory
+               FileUtils.deleteFileOrDirectory(storageDir);
+
+               // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the
+               // shutdown hook itself
+               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                       try {
+                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                       }
+                       catch (IllegalStateException e) {
+                               // race, JVM is in shutdown already, we can 
safely ignore this
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Exception while unregistering 
FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
+                       }
+               }
+       }
+
+       // --------------------------------------------------------------
+       // Internal methods
+       // --------------------------------------------------------------
+
+       private int calculateSize(JobID jobId, ArchivedExecutionGraph 
serializableExecutionGraph) {
+               final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
+
+               if (archivedExecutionGraphFile.exists()) {
+                       return 
Math.toIntExact(archivedExecutionGraphFile.length());
+               } else {
+                       LOG.debug("Could not find archived execution graph file 
for {}. Estimating the size instead.", jobId);
+                       return 
serializableExecutionGraph.getAllVertices().size() * 1000 +
+                               
serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
+               }
+       }
+
+       private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws 
IOException, ClassNotFoundException {
+               final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
+
+               if (archivedExecutionGraphFile.exists()) {
+                       try (FileInputStream fileInputStream = new 
FileInputStream(archivedExecutionGraphFile)) {
+                               return 
InstantiationUtil.deserializeObject(fileInputStream, 
getClass().getClassLoader());
+                       }
+               } else {
+                       throw new FileNotFoundException("Could not find file 
for archived execution graph " + jobId +
+                               ". This indicates that the file either has been 
deleted or never written.");
+               }
+       }
+
+       private void storeArchivedExecutionGraph(ArchivedExecutionGraph 
archivedExecutionGraph) throws IOException {
+               final File archivedExecutionGraphFile = 
getExecutionGraphFile(archivedExecutionGraph.getJobID());
+
+               try (FileOutputStream fileOutputStream = new 
FileOutputStream(archivedExecutionGraphFile)) {
+                       InstantiationUtil.serializeObject(fileOutputStream, 
archivedExecutionGraph);
+               }
+       }
+
+       private File getExecutionGraphFile(JobID jobId) {
+               return new File(storageDir, jobId.toString());
+       }
+
+       private void deleteExecutionGraphFile(JobID jobId) {
+               Preconditions.checkNotNull(jobId);
+
+               final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
+
+               try {
+                       
FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
+               } catch (IOException e) {
+                       LOG.debug("Could not delete file {}.", 
archivedExecutionGraphFile, e);
+               }
+
+               archivedExecutionGraphCache.invalidate(jobId);
+               jobDetailsCache.invalidate(jobId);
+       }
+
+       private static File initExecutionGraphStorageDirectory(File tmpDir) 
throws IOException {
+               final int maxAttempts = 10;
+
+               for (int attempt = 0; attempt < maxAttempts; attempt++) {
+                       final File storageDirectory = new File(tmpDir, 
"executionGraphStore-" + UUID.randomUUID());
+
+                       if (storageDirectory.mkdir()) {
+                               return storageDirectory;
+                       }
+               }
+
+               throw new IOException("Could not create executionGraphStorage 
directory in " + tmpDir + '.');
+       }
+
+       // --------------------------------------------------------------
+       // Testing methods
+       // --------------------------------------------------------------
+
+       @VisibleForTesting
+       File getStorageDir() {
+               return storageDir;
+       }
+
+       @VisibleForTesting
+       LoadingCache<JobID, ArchivedExecutionGraph> 
getArchivedExecutionGraphCache() {
+               return archivedExecutionGraphCache;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index c64c883..b71b6bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -50,6 +50,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
+                       ArchivedExecutionGraphStore archivedExecutionGraphStore,
                        FatalErrorHandler fatalErrorHandler,
                        @Nullable String restAddress) throws Exception {
                super(
@@ -61,6 +62,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
+                       archivedExecutionGraphStore,
                        fatalErrorHandler,
                        restAddress);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0628281..0b1cea0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -20,13 +20,18 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -47,10 +52,14 @@ import 
org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+
 import akka.actor.ActorSystem;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.Executor;
 
 /**
@@ -68,6 +77,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
 
        private DispatcherRestEndpoint dispatcherRestEndpoint;
 
+       private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+
        public SessionClusterEntrypoint(Configuration configuration) {
                super(configuration);
        }
@@ -81,6 +92,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry) throws Exception {
 
+               archivedExecutionGraphStore = 
createSerializableExecutionGraphStore(configuration, 
rpcService.getScheduledExecutor());
+
                dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
 
                resourceManagerRetrievalService = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
@@ -131,6 +144,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
+                       archivedExecutionGraphStore,
                        this,
                        dispatcherRestEndpoint.getRestAddress());
 
@@ -143,6 +157,22 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
        }
 
+       private ArchivedExecutionGraphStore 
createSerializableExecutionGraphStore(
+                       Configuration configuration,
+                       ScheduledExecutor scheduledExecutor) throws IOException 
{
+               final File tmpDir = new 
File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
+
+               final Time expirationTime =  
Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
+               final long maximumCacheSizeBytes = 
configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
+
+               return new FileArchivedExecutionGraphStore(
+                       tmpDir,
+                       expirationTime,
+                       maximumCacheSizeBytes,
+                       scheduledExecutor,
+                       Ticker.systemTicker());
+       }
+
        @Override
        protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
                Throwable exception = null;
@@ -183,6 +213,14 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        }
                }
 
+               if (archivedExecutionGraphStore != null) {
+                       try {
+                               archivedExecutionGraphStore.close();
+                       } catch (Throwable t) {
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       }
+               }
+
                if (exception != null) {
                        throw new FlinkException("Could not properly shut down 
the session cluster entry point.", exception);
                }
@@ -215,6 +253,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
+               ArchivedExecutionGraphStore archivedExecutionGraphStore,
                FatalErrorHandler fatalErrorHandler,
                @Nullable String restAddress) throws Exception {
 
@@ -228,6 +267,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
+                       archivedExecutionGraphStore,
                        fatalErrorHandler,
                        restAddress);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0c8ee16..6d22e75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -72,6 +72,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
@@ -824,7 +825,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                Collection<JobStatus> jobStatuses = 
Collections.singleton(executionGraph.getState());
 
                return resourceOverviewFuture.thenApply(
-                       (ResourceOverview resourceOverview) -> 
ClusterOverview.create(resourceOverview, jobStatuses));
+                       (ResourceOverview resourceOverview) -> new 
ClusterOverview(
+                               resourceOverview,
+                               JobsOverview.create(jobStatuses)));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 69b45b5..4eedbc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
-import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.Collection;
-
 /**
  * Response to the {@link RequestStatusOverview} message, carrying a 
description
  * of the Flink cluster status.
@@ -73,6 +69,17 @@ public class ClusterOverview extends JobsOverview {
                this.numSlotsAvailable = numSlotsAvailable;
        }
 
+       public ClusterOverview(ResourceOverview resourceOverview, JobsOverview 
jobsOverview) {
+               this(
+                       resourceOverview.getNumberTaskManagers(),
+                       resourceOverview.getNumberRegisteredSlots(),
+                       resourceOverview.getNumberFreeSlots(),
+                       jobsOverview.getNumJobsRunningOrPending(),
+                       jobsOverview.getNumJobsFinished(),
+                       jobsOverview.getNumJobsCancelled(),
+                       jobsOverview.getNumJobsFailed());
+       }
+
        public int getNumTaskManagersConnected() {
                return numTaskManagersConnected;
        }
@@ -128,40 +135,4 @@ public class ClusterOverview extends JobsOverview {
                                ", numJobsFailed=" + getNumJobsFailed() +
                                '}';
        }
-
-       public static ClusterOverview create(ResourceOverview resourceOverview, 
Collection<JobStatus> allJobsStatus) {
-               Preconditions.checkNotNull(resourceOverview);
-               Preconditions.checkNotNull(allJobsStatus);
-
-               int numberRunningOrPendingJobs = 0;
-               int numberFinishedJobs = 0;
-               int numberCancelledJobs = 0;
-               int numberFailedJobs = 0;
-
-               for (JobStatus status : allJobsStatus) {
-                       switch (status) {
-                               case FINISHED:
-                                       numberFinishedJobs++;
-                                       break;
-                               case FAILED:
-                                       numberFailedJobs++;
-                                       break;
-                               case CANCELED:
-                                       numberCancelledJobs++;
-                                       break;
-                               default:
-                                       numberRunningOrPendingJobs++;
-                                       break;
-                       }
-               }
-
-               return new ClusterOverview(
-                       resourceOverview.getNumberTaskManagers(),
-                       resourceOverview.getNumberRegisteredSlots(),
-                       resourceOverview.getNumberFreeSlots(),
-                       numberRunningOrPendingJobs,
-                       numberFinishedJobs,
-                       numberCancelledJobs,
-                       numberFailedJobs);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
index 3834a76..1d65245 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.Preconditions;
+
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collection;
+
 /**
  * An overview of how many jobs are in which status.
  */
@@ -51,7 +56,7 @@ public class JobsOverview implements InfoMessage {
                        @JsonProperty(FIELD_NAME_JOBS_FINISHED) int 
numJobsFinished,
                        @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int 
numJobsCancelled,
                        @JsonProperty(FIELD_NAME_JOBS_FAILED) int 
numJobsFailed) {
-               
+
                this.numJobsRunningOrPending = numJobsRunningOrPending;
                this.numJobsFinished = numJobsFinished;
                this.numJobsCancelled = numJobsCancelled;
@@ -80,9 +85,9 @@ public class JobsOverview implements InfoMessage {
        public int getNumJobsFailed() {
                return numJobsFailed;
        }
-       
+
        // 
------------------------------------------------------------------------
-       
+
        @Override
        public boolean equals(Object obj) {
                if (this == obj) {
@@ -118,4 +123,42 @@ public class JobsOverview implements InfoMessage {
                                ", numJobsFailed=" + numJobsFailed +
                                '}';
        }
+
+       /**
+        * Combines the given jobs overview with this.
+        *
+        * @param jobsOverview to combine with this
+        * @return Combined jobs overview
+        */
+       public JobsOverview combine(JobsOverview jobsOverview) {
+               return new JobsOverview(this, jobsOverview);
+       }
+
+       public static JobsOverview create(Collection<JobStatus> allJobsStatus) {
+               Preconditions.checkNotNull(allJobsStatus);
+
+               int numberRunningOrPendingJobs = 0;
+               int numberFinishedJobs = 0;
+               int numberCancelledJobs = 0;
+               int numberFailedJobs = 0;
+
+               for (JobStatus status : allJobsStatus) {
+                       switch (status) {
+                               case FINISHED:
+                                       numberFinishedJobs++;
+                                       break;
+                               case FAILED:
+                                       numberFailedJobs++;
+                                       break;
+                               case CANCELED:
+                                       numberCancelledJobs++;
+                                       break;
+                               default:
+                                       numberRunningOrPendingJobs++;
+                                       break;
+                       }
+               }
+
+               return new JobsOverview(numberRunningOrPendingJobs, 
numberFinishedJobs, numberCancelledJobs, numberFailedJobs);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
new file mode 100644
index 0000000..1fc7705
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+       private final ConcurrentLinkedQueue<ScheduledTask<?>> scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+       @Override
+       public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
+               return insertRunnable(command, false);
+       }
+
+       @Override
+       public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit) {
+               final ScheduledTask<V> scheduledTask = new 
ScheduledTask<>(callable, false);
+
+               scheduledTasks.offer(scheduledTask);
+
+               return scheduledTask;
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+               return insertRunnable(command, true);
+       }
+
+       @Override
+       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+               return insertRunnable(command, true);
+       }
+
+       /**
+        * Triggers all registered tasks.
+        */
+       public void triggerScheduledTasks() {
+               final Iterator<ScheduledTask<?>> iterator = 
scheduledTasks.iterator();
+
+               while (iterator.hasNext()) {
+                       final ScheduledTask<?> scheduledTask = iterator.next();
+
+                       scheduledTask.execute();
+
+                       if (!scheduledTask.isPeriodic) {
+                               iterator.remove();
+                       }
+               }
+       }
+
+       private ScheduledFuture<?> insertRunnable(Runnable command, boolean 
isPeriodic) {
+               final ScheduledTask<?> scheduledTask = new ScheduledTask<>(
+                       () -> {
+                               command.run();
+                               return null;
+                       },
+                       isPeriodic);
+
+               scheduledTasks.offer(scheduledTask);
+
+               return scheduledTask;
+       }
+
+       private static final class ScheduledTask<T> implements 
ScheduledFuture<T> {
+
+               private final Callable<T> callable;
+
+               private final boolean isPeriodic;
+
+               private final CompletableFuture<T> result;
+
+               private ScheduledTask(Callable<T> callable, boolean isPeriodic) 
{
+                       this.callable = Preconditions.checkNotNull(callable);
+                       this.isPeriodic = isPeriodic;
+
+                       this.result = new CompletableFuture<>();
+               }
+
+               public void execute() {
+                       if (!result.isDone()) {
+                               if (!isPeriodic) {
+                                       try {
+                                               
result.complete(callable.call());
+                                       } catch (Exception e) {
+                                               result.completeExceptionally(e);
+                                       }
+                               } else {
+                                       try {
+                                               callable.call();
+                                       } catch (Exception e) {
+                                               result.completeExceptionally(e);
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public long getDelay(TimeUnit unit) {
+                       return 0;
+               }
+
+               @Override
+               public int compareTo(Delayed o) {
+                       return 0;
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       return result.cancel(mayInterruptIfRunning);
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return result.isCancelled();
+               }
+
+               @Override
+               public boolean isDone() {
+                       return result.isDone();
+               }
+
+               @Override
+               public T get() throws InterruptedException, ExecutionException {
+                       return result.get();
+               }
+
+               @Override
+               public T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+                       return result.get(timeout, unit);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 76a3c98..c9b9bfb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -173,6 +173,7 @@ public class DispatcherTest extends TestLogger {
                        new BlobServer(blobServerConfig, new VoidBlobStore()),
                        heartbeatServices,
                        NoOpMetricRegistry.INSTANCE,
+                       new MemoryArchivedExecutionGraphStore(),
                        fatalErrorHandler,
                        TEST_JOB_ID);
 
@@ -344,6 +345,7 @@ public class DispatcherTest extends TestLogger {
                                BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
                                MetricRegistry metricRegistry,
+                               ArchivedExecutionGraphStore 
archivedExecutionGraphStore,
                                FatalErrorHandler fatalErrorHandler,
                                JobID expectedJobId) throws Exception {
                        super(
@@ -355,6 +357,7 @@ public class DispatcherTest extends TestLogger {
                                blobServer,
                                heartbeatServices,
                                metricRegistry,
+                               archivedExecutionGraphStore,
                                fatalErrorHandler,
                                null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
new file mode 100644
index 0000000..e6d700e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+       private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = 
Arrays.stream(JobStatus.values())
+               .filter(JobStatus::isGloballyTerminalState)
+               .collect(Collectors.toList());
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * Tests that we can put {@link ArchivedExecutionGraph} into the
+        * {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+        */
+       @Test
+       public void testPut() throws IOException {
+               final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+               final File rootDir = temporaryFolder.newFolder();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+                       final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+                       // check that the storage directory is empty
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+                       executionGraphStore.put(dummyExecutionGraph);
+
+                       // check that we have persisted the given execution 
graph
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+                       
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+               }
+       }
+
+       /**
+        * Tests that null is returned if we request an unknown JobID.
+        */
+       @Test
+       public void testUnknownGet() throws IOException {
+               final File rootDir = temporaryFolder.newFolder();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+                       assertThat(executionGraphStore.get(new JobID()), 
Matchers.nullValue());
+               }
+       }
+
+       /**
+        * Tests that we obtain the correct jobs overview.
+        */
+       @Test
+       public void testStoredJobsOverview() throws IOException {
+               final int numberExecutionGraphs = 10;
+               final Collection<ArchivedExecutionGraph> executionGraphs = 
generateTerminalExecutionGraphs(numberExecutionGraphs);
+
+               final List<JobStatus> jobStatuses = 
executionGraphs.stream().map(ArchivedExecutionGraph::getState).collect(Collectors.toList());
+
+               final JobsOverview expectedJobsOverview = 
JobsOverview.create(jobStatuses);
+
+               final File rootDir = temporaryFolder.newFolder();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+                       for (ArchivedExecutionGraph executionGraph : 
executionGraphs) {
+                               executionGraphStore.put(executionGraph);
+                       }
+
+                       assertThat(executionGraphStore.getStoredJobsOverview(), 
Matchers.equalTo(expectedJobsOverview));
+               }
+       }
+
+       /**
+        * Tests that we obtain the correct collection of available job details.
+        */
+       @Test
+       public void testAvailableJobDetails() throws IOException {
+               final int numberExecutionGraphs = 10;
+               final Collection<ArchivedExecutionGraph> executionGraphs = 
generateTerminalExecutionGraphs(numberExecutionGraphs);
+
+               final Collection<JobDetails> jobDetails = 
executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
+
+               final File rootDir = temporaryFolder.newFolder();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+                       for (ArchivedExecutionGraph executionGraph : 
executionGraphs) {
+                               executionGraphStore.put(executionGraph);
+                       }
+
+                       
assertThat(executionGraphStore.getAvailableJobDetails(), 
Matchers.containsInAnyOrder(jobDetails.toArray()));
+               }
+       }
+
+       /**
+        * Tests that an expired execution graph is removed from the execution 
graph store.
+        */
+       @Test
+       public void testExecutionGraphExpiration() throws Exception {
+               final File rootDir = temporaryFolder.newFolder();
+
+               final Time expirationTime = Time.milliseconds(1L);
+
+               final ManuallyTriggeredScheduledExecutor scheduledExecutor = 
new ManuallyTriggeredScheduledExecutor();
+
+               final ManualTicker manualTicker = new ManualTicker();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= new FileArchivedExecutionGraphStore(
+                       rootDir,
+                       expirationTime,
+                       10000L,
+                       scheduledExecutor,
+                       manualTicker)) {
+
+                       final ArchivedExecutionGraph executionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+
+                       executionGraphStore.put(executionGraph);
+
+                       // there should one execution graph
+                       assertThat(executionGraphStore.size(), 
Matchers.equalTo(1));
+
+                       
manualTicker.advanceTime(expirationTime.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // this should trigger the cleanup after expiration
+                       scheduledExecutor.triggerScheduledTasks();
+
+                       assertThat(executionGraphStore.size(), 
Matchers.equalTo(0));
+
+                       
assertThat(executionGraphStore.get(executionGraph.getJobID()), 
Matchers.nullValue());
+
+                       final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+                       // check that the persisted file has been deleted
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+               }
+       }
+
+       /**
+        * Tests that all persisted files are cleaned up after closing the 
store.
+        */
+       @Test
+       public void testCloseCleansUp() throws IOException {
+               final File rootDir = temporaryFolder.newFolder();
+
+               assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+                       assertThat(rootDir.listFiles().length, 
Matchers.equalTo(1));
+
+                       final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+                       executionGraphStore.put(new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build());
+
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+               }
+
+               assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
+       }
+
+       /**
+        * Tests that evicted {@link ArchivedExecutionGraph} are loaded from 
disk again.
+        */
+       @Test
+       public void testCacheLoading() throws IOException {
+               final File rootDir = temporaryFolder.newFolder();
+
+               try (final FileArchivedExecutionGraphStore executionGraphStore 
= new FileArchivedExecutionGraphStore(
+                       rootDir,
+                       Time.hours(1L),
+                       100L << 10,
+                       TestingUtils.defaultScheduledExecutor(),
+                       Ticker.systemTicker())) {
+
+                       final LoadingCache<JobID, ArchivedExecutionGraph> 
executionGraphCache = executionGraphStore.getArchivedExecutionGraphCache();
+
+                       Collection<ArchivedExecutionGraph> executionGraphs = 
new ArrayList<>(64);
+
+                       boolean continueInserting = true;
+
+                       // insert execution graphs until the first one got 
evicted
+                       while (continueInserting) {
+                               // has roughly a size of 1.4 KB
+                               final ArchivedExecutionGraph executionGraph = 
new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+
+                               executionGraphStore.put(executionGraph);
+
+                               executionGraphs.add(executionGraph);
+
+                               continueInserting = executionGraphCache.size() 
== executionGraphs.size();
+                       }
+
+                       final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+                       assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(executionGraphs.size()));
+
+                       for (ArchivedExecutionGraph executionGraph : 
executionGraphs) {
+                               
assertThat(executionGraphStore.get(executionGraph.getJobID()), 
matchesPartiallyWith(executionGraph));
+                       }
+               }
+       }
+
+       private Collection<ArchivedExecutionGraph> 
generateTerminalExecutionGraphs(int number) {
+               final Collection<ArchivedExecutionGraph> executionGraphs = new 
ArrayList<>(number);
+
+               for (int i = 0; i < number; i++) {
+                       final JobStatus state = 
GLOBALLY_TERMINAL_JOB_STATUS.get(ThreadLocalRandom.current().nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size()));
+                       executionGraphs.add(
+                               new ArchivedExecutionGraphBuilder()
+                                       .setState(state)
+                                       .build());
+               }
+
+               return executionGraphs;
+       }
+
+       private FileArchivedExecutionGraphStore 
createDefaultExecutionGraphStore(File storageDirectory) throws IOException {
+               return new FileArchivedExecutionGraphStore(
+                       storageDirectory,
+                       Time.hours(1L),
+                       10000L,
+                       TestingUtils.defaultScheduledExecutor(),
+                       Ticker.systemTicker());
+       }
+
+       private static final class ManualTicker extends Ticker {
+
+               private long currentTime = 0;
+
+               @Override
+               public long read() {
+                       return currentTime;
+               }
+
+               void advanceTime(long duration, TimeUnit timeUnit) {
+                       currentTime += timeUnit.toNanos(duration);
+               }
+       }
+
+       private static final class PartialArchivedExecutionGraphMatcher extends 
BaseMatcher<ArchivedExecutionGraph> {
+
+               private final ArchivedExecutionGraph archivedExecutionGraph;
+
+               private 
PartialArchivedExecutionGraphMatcher(ArchivedExecutionGraph 
expectedArchivedExecutionGraph) {
+                       this.archivedExecutionGraph = 
Preconditions.checkNotNull(expectedArchivedExecutionGraph);
+               }
+
+               @Override
+               public boolean matches(Object o) {
+                       if (archivedExecutionGraph == o) {
+                               return true;
+                       }
+                       if (o == null || archivedExecutionGraph.getClass() != 
o.getClass()) {
+                               return false;
+                       }
+                       ArchivedExecutionGraph that = (ArchivedExecutionGraph) 
o;
+                       return archivedExecutionGraph.isStoppable() == 
that.isStoppable() &&
+                               
Objects.equals(archivedExecutionGraph.getJobID(), that.getJobID()) &&
+                               
Objects.equals(archivedExecutionGraph.getJobName(), that.getJobName()) &&
+                               archivedExecutionGraph.getState() == 
that.getState() &&
+                               
Objects.equals(archivedExecutionGraph.getJsonPlan(), that.getJsonPlan()) &&
+                               
Objects.equals(archivedExecutionGraph.getAccumulatorsSerialized(), 
that.getAccumulatorsSerialized()) &&
+                               
Objects.equals(archivedExecutionGraph.getCheckpointCoordinatorConfiguration(), 
that.getCheckpointCoordinatorConfiguration()) &&
+                               archivedExecutionGraph.getAllVertices().size() 
== that.getAllVertices().size();
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("Matches against " + 
ArchivedExecutionGraph.class.getSimpleName() + '.');
+               }
+       }
+
+       private static Matcher<ArchivedExecutionGraph> 
matchesPartiallyWith(ArchivedExecutionGraph executionGraph) {
+               return new PartialArchivedExecutionGraphMatcher(executionGraph);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
new file mode 100644
index 0000000..9bfdbb3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * {@link ArchivedExecutionGraphStore} implementation which stores the {@link 
ArchivedExecutionGraph}
+ * in memory.
+ */
+public class MemoryArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+       private final Map<JobID, ArchivedExecutionGraph> 
serializableExecutionGraphs = new HashMap<>(4);
+
+       @Override
+       public int size() {
+               return serializableExecutionGraphs.size();
+       }
+
+       @Nullable
+       @Override
+       public ArchivedExecutionGraph get(JobID jobId) {
+               return serializableExecutionGraphs.get(jobId);
+       }
+
+       @Override
+       public void put(ArchivedExecutionGraph serializableExecutionGraph) 
throws IOException {
+               
serializableExecutionGraphs.put(serializableExecutionGraph.getJobID(), 
serializableExecutionGraph);
+       }
+
+       @Override
+       public JobsOverview getStoredJobsOverview() {
+               Collection<JobStatus> allJobStatus = 
serializableExecutionGraphs.values().stream()
+                       .map(ArchivedExecutionGraph::getState)
+                       .collect(Collectors.toList());
+
+               return JobsOverview.create(allJobStatus);
+       }
+
+       @Override
+       public Collection<JobDetails> getAvailableJobDetails() {
+               return serializableExecutionGraphs.values().stream()
+                       .map(WebMonitorUtils::createDetailsForJob)
+                       .collect(Collectors.toList());
+       }
+
+       @Override
+       public void close() throws IOException {
+               serializableExecutionGraphs.clear();
+       }
+}

Reply via email to