[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341126#comment-16341126
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5310


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341114#comment-16341114
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
👍 


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340925#comment-16340925
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164087390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() 

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340919#comment-16340919
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
No, I only had hard crashes in mind. One could do a directory listing and 
delete old files.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340796#comment-16340796
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5310
  
I've rebased onto the latest master and addressed your comments @GJL with 
6eb11dd.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340788#comment-16340788
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5310
  
Thanks for the review @GJL. Ideally we don't leave things around when no 
longer needed. Thus, which scenario other than a hard crash have you spotted 
that doesn't clean up the graphs?


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340772#comment-16340772
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164062610
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), ne

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340748#comment-16340748
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164058994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

Good point. Will change it.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340737#comment-16340737
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164058010
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

You're right, will change it.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340730#comment-16340730
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164057811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340733#comment-16340733
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164057896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue> scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator> iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

Will remove it.


> Add SerializableExecutionGraphStore to D

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340723#comment-16340723
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r164056776
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

Good catch. Will write a proper error message.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339594#comment-16339594
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
Is it acceptable behavior that sometimes Graphs don't get deleted from disk?


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339570#comment-16339570
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163918048
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
--- End diff --

It looks like a constant, i.e., it shouldn't be mutable.
```
private static final List GLOBALLY_TERMINAL_JOB_STATUS = 
Collections.unmodifiableList(
Arrays.stream(JobStatus.values())
.filter(JobStatus::isGloballyTerminalState)
.collect(Collectors.toList()));
```
Using `@BeforeClass` is not idiomatic imo.



> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339572#comment-16339572
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919241
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
--- End diff --

With `ThreadLocalRandom.current().nextInt(...)` you already have an 
available random instance which does not suffer from lock contention problems.


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339573#comment-16339573
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163922121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
Partia

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339569#comment-16339569
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163919921
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FileArchivedExecutionGraphStore}.
+ */
+@Category(Flip6.class)
+public class FileArchivedExecutionGraphStoreTest extends TestLogger {
+
+   private static final List GLOBALLY_TERMINAL_JOB_STATUS = new 
ArrayList<>(3);
+
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @BeforeClass
+   public static void setup() {
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED);
+   GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED);
+   }
+
+   /**
+* Tests that we can put {@link ArchivedExecutionGraph} into the
+* {@link FileArchivedExecutionGraphStore} and that the graph is 
persisted.
+*/
+   @Test
+   public void testPut() throws IOException {
+   final ArchivedExecutionGraph dummyExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
+   final File rootDir = temporaryFolder.newFolder();
+
+   try (final FileArchivedExecutionGraphStore executionGraphStore 
= createDefaultExecutionGraphStore(rootDir)) {
+
+   final File storageDirectory = 
executionGraphStore.getStorageDir();
+
+   // check that the storage directory is empty
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(0));
+
+   executionGraphStore.put(dummyExecutionGraph);
+
+   // check that we have persisted the given execution 
graph
+   assertThat(storageDirectory.listFiles().length, 
Matchers.equalTo(1));
+
+   
assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new 
Partia

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339571#comment-16339571
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163915273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
+ * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
+ * the stored execution graphs are periodically cleaned up.
+ */
+public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
+
+   private final File storageDir;
+
+   private final Cache jobDetailsCache;
+
+   private final LoadingCache 
archivedExecutionGraphCache;
+
+   private final ScheduledFuture cleanupFuture;
+
+   private final Thread shutdownHook;
+
+   private int numFinishedJobs;
+
+   private int numFailedJobs;
+
+   private int numCanceledJobs;
+
+   public FileArchivedExecutionGraphStore(
+   File rootDir,
+   Time expirationTime,
+   long maximumCacheSizeBytes,
+   ScheduledExecutor scheduledExecutor) throws IOException 
{
+
+   final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
+
+   LOG.info(
+   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
+   FileArchivedExecutionGraphStore.class.getSimpleName(),
+   storageDirectory,
+   expirationTime.toMilliseconds(),
+   maximumCacheSizeBytes);
+
+   this.storageDir = Preconditions.checkNotNull(storageDirectory);
+   Preconditions.checkArgument(
+   storageDirectory.exists() 

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339567#comment-16339567
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163921620
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutor} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutor extends 
ManuallyTriggeredDirectExecutor implements ScheduledExecutor {
+
+   private final ConcurrentLinkedQueue> scheduledTasks = 
new ConcurrentLinkedQueue<>();
+
+   @Override
+   public ScheduledFuture schedule(Runnable command, long delay, 
TimeUnit unit) {
+   return insertRunnable(command, false);
+   }
+
+   @Override
+   public  ScheduledFuture schedule(Callable callable, long 
delay, TimeUnit unit) {
+   final ScheduledTask scheduledTask = new 
ScheduledTask<>(callable, false);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   @Override
+   public ScheduledFuture scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   @Override
+   public ScheduledFuture scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+   return insertRunnable(command, true);
+   }
+
+   /**
+* Triggers all registered tasks.
+*/
+   public void triggerScheduledTasks() {
+   final Iterator> iterator = 
scheduledTasks.iterator();
+
+   while (iterator.hasNext()) {
+   final ScheduledTask scheduledTask = iterator.next();
+
+   scheduledTask.execute();
+
+   if (!scheduledTask.isPeriodic) {
+   iterator.remove();
+   }
+   }
+   }
+
+   private ScheduledFuture insertRunnable(Runnable command, boolean 
isPeriodic) {
+   final ScheduledTask scheduledTask = new ScheduledTask<>(
+   () -> {
+   command.run();
+   return null;
+   },
+   isPeriodic);
+
+   scheduledTasks.offer(scheduledTask);
+
+   return scheduledTask;
+   }
+
+   private static final class ScheduledTask implements 
ScheduledFuture {
+
+   private final Callable callable;
+
+   private final boolean isPeriodic;
+
+   private final CompletableFuture result;
+
+   private ScheduledTask(Callable callable, boolean isPeriodic) 
{
+   this.callable = Preconditions.checkNotNull(callable);
+   this.isPeriodic = isPeriodic;
+
+   this.result = new CompletableFuture<>();
+   }
+
+   public boolean isPeriodic() {
--- End diff --

nit: method is unused


> Add SerializableExecutionGraphStore to Disp

[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339568#comment-16339568
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5310#discussion_r163911570
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}
 
-   private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
-   final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+   private void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+   
Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(),
 "");
--- End diff --

The `errorMessage` is an empty string. Leave it out completely or put 
something meaningful. 


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330684#comment-16330684
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5310

[FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher

## What is the purpose of the change

The SerializableExecutionGraphStore is responsible for storing completed 
jobs
for historic job requests (e.g. from the web ui or from the client). The 
store
is populated by the Dispatcher once a job has terminated.

The FileSerializableExecutionGraphStore implementation persists all
SerializableExecutionGraphs on disk in order to avoid OOM problems. It only 
keeps
some of the stored graphs in memory until it reaches a configurable size. 
Once
coming close to this size, it will evict the elements and only reload them 
if
requested again. Additionally, the FileSerializableExecutionGraphStore 
defines
an expiration time after which the execution graphs will be removed from 
disk.
This prevents excessive use of disk resources.

This PR is based on #5309.

## Brief change log

- Introduce `SerializableExecutionGraphStore` and 
`FileSerializableExecutionGraphStore`
- Add `FileSerializableExecutionGraphStore` to `Dispatcher`
- Store `SerializableExecutionGraphs` in corresponding 
`FileSerializableExecutionGraphStore`
- Adapt `Dispatcher` to serve requests for historic jobs

## Verifying this change

- Added `FileSerializableExecutionGraphStoreTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

cc @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addHistoricJobView

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5310


commit a959b9411833e320065b328ed2fc936b58f911f4
Author: Till Rohrmann 
Date:   2018-01-16T17:45:53Z

[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

This commit introduces the SerializableExecutionGraph which extends the
AccessExecutionGraph and adds serializability to it. Moreover, this commit
changes the OnCompletionActions interface such that it accepts a
SerializableExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.

commit ca15b076c05ff940a12a240ba385e2434f93790b
Author: Till Rohrmann 
Date:   2018-01-18T14:02:36Z

[hotfix] [tests] Let BucketingSink extend TestLogger

commit 21c25502fb6d07c6fb65f18100dc6d4ec23e9d93
Author: Till Rohrmann 
Date:   2018-01-17T14:01:57Z

[FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#requestJob type safe

Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a
CompletableFuture instead of a
CompletableFuture. In order to support the old code
and the JobManagerGateway implementation we have to keep the return type
in RestfulGateway. Once the old code has been removed, we should change
this as well.

commit 7b7b0692582189b8e540e5ae022d351c45991e43
Author: Till Rohrmann 
Date:   2018-01-17T11:22:43Z

[FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher

The SerializableExecutionGraphStore is responsible for storing completed 
jobs
for historic job requests (e.g. from the web ui or from the client). The 
store
is populated by the Dispatcher once a job has terminated.

The FileSerializableExecutionGraphStore implementation persists all
SerializableExecutionGraphs on disk in order to avoid OOM problems. It only 
keeps
some of the stored graphs in memory until it reaches a configurable size. 
Once
coming close to this size, it will evict the elements and only reload them 
if
requested again. Additionally, the FileSerializableExecutionGraphStore 
defines
an expiration time afte