[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5310


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164087390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() && 
storageDirectory.isDirectory(),
+   "The storage directory must exist and be a directory.");
+   this.jobDetailsCache = CacheBuilder.newBuilder()
+   .expireAfterWrite(expirationTime.toMilliseconds(),

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164062610
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+   }
+   }
+
+   /**
+* Tests that null is returned if we request an unknown JobID.
+*/
+   @Test
+   public void testUnknownGet() throws IOExce

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164058994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

Good point. Will change it.


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164058010
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

You're right, will change it.


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164057896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue> scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator> iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

Will remove it.


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164057811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() && 
storageDirectory.isDirectory(),
+   "The storage directory must exist and be a directory.");
+   this.jobDetailsCache = CacheBuilder.newBuilder()
+   .expireAfterWrite(expirationTime.toMillis

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164056776
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

Good catch. Will write a proper error message.


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919241
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

With `ThreadLocalRandom.current().nextInt(...)` you already have an 
available random instance which does not suffer from lock contention problems.


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163918048
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

It looks like a constant, i.e., it shouldn't be mutable.
```
private static final List GLOBALLY_TERMINAL_JOB_STATUS = 
Collections.unmodifiableList(
Arrays.stream(JobStatus.values())
.filter(JobStatus::isGloballyTerminalState)
.collect(Collectors.toList()));
```
Using `@BeforeClass` is not idiomatic imo.



---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163915273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() && 
storageDirectory.isDirectory(),
+   "The storage directory must exist and be a directory.");
+   this.jobDetailsCache = CacheBuilder.newBuilder()
+   .expireAfterWrite(expirationTime.toMilliseconds(),

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163911570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

The `errorMessage` is an empty string. Leave it out completely or put 
something meaningful. 


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163921620
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue> scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator> iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

nit: method is unused


---


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163922121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
+   }
+   }
+
+   /**
+* Tests that null is returned if we request an unknown JobID.
+*/
+   @Test
+   public void testUnknownGet() throws IOException {
 

[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-25 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919921
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
PartialArchivedExecutionGraphMatcher(dummyExecutionGraph));
--- End diff --

It is not obvious what the matcher is doing. How about:
`assertThat(...), isPredicateFulfilled(..))`

```
private static Matcher 
isPredicateFulfilled(Ar