[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341126#comment-16341126 ] ASF GitHub Bot commented on FLINK-8453: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5310 > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341114#comment-16341114 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 👍 > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340925#comment-16340925 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164087390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists()
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340919#comment-16340919 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 No, I only had hard crashes in mind. One could do a directory listing and delete old files. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340796#comment-16340796 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5310 I've rebased onto the latest master and addressed your comments @GJL with 6eb11dd. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340788#comment-16340788 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5310 Thanks for the review @GJL. Ideally we don't leave things around when no longer needed. Thus, which scenario other than a hard crash have you spotted that doesn't clean up the graphs? > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340772#comment-16340772 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164062610 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), ne
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340748#comment-16340748 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164058994 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- Good point. Will change it. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340737#comment-16340737 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164058010 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- You're right, will change it. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340730#comment-16340730 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164057811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340733#comment-16340733 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164057896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueue> scheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator> iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- Will remove it. > Add SerializableExecutionGraphStore to D
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340723#comment-16340723 ] ASF GitHub Bot commented on FLINK-8453: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164056776 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- Good catch. Will write a proper error message. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339594#comment-16339594 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5310 Is it acceptable behavior that sometimes Graphs don't get deleted from disk? > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339570#comment-16339570 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163918048 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- It looks like a constant, i.e., it shouldn't be mutable. ``` private static final List GLOBALLY_TERMINAL_JOB_STATUS = Collections.unmodifiableList( Arrays.stream(JobStatus.values()) .filter(JobStatus::isGloballyTerminalState) .collect(Collectors.toList())); ``` Using `@BeforeClass` is not idiomatic imo. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339572#comment-16339572 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919241 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- With `ThreadLocalRandom.current().nextInt(...)` you already have an available random instance which does not suffer from lock contention problems. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339573#comment-16339573 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163922121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new Partia
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339569#comment-16339569 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919921 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new Partia
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339571#comment-16339571 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163915273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists()
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339567#comment-16339567 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163921620 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueue> scheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator> iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- nit: method is unused > Add SerializableExecutionGraphStore to Disp
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339568#comment-16339568 ] ASF GitHub Bot commented on FLINK-8453: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163911570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- The `errorMessage` is an empty string. Leave it out completely or put something meaningful. > Add SerializableExecutionGraphStore to Dispatcher > - > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330684#comment-16330684 ] ASF GitHub Bot commented on FLINK-8453: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5310 [FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher ## What is the purpose of the change The SerializableExecutionGraphStore is responsible for storing completed jobs for historic job requests (e.g. from the web ui or from the client). The store is populated by the Dispatcher once a job has terminated. The FileSerializableExecutionGraphStore implementation persists all SerializableExecutionGraphs on disk in order to avoid OOM problems. It only keeps some of the stored graphs in memory until it reaches a configurable size. Once coming close to this size, it will evict the elements and only reload them if requested again. Additionally, the FileSerializableExecutionGraphStore defines an expiration time after which the execution graphs will be removed from disk. This prevents excessive use of disk resources. This PR is based on #5309. ## Brief change log - Introduce `SerializableExecutionGraphStore` and `FileSerializableExecutionGraphStore` - Add `FileSerializableExecutionGraphStore` to `Dispatcher` - Store `SerializableExecutionGraphs` in corresponding `FileSerializableExecutionGraphStore` - Adapt `Dispatcher` to serve requests for historic jobs ## Verifying this change - Added `FileSerializableExecutionGraphStoreTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addHistoricJobView Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5310.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5310 commit a959b9411833e320065b328ed2fc936b58f911f4 Author: Till Rohrmann Date: 2018-01-16T17:45:53Z [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph This commit introduces the SerializableExecutionGraph which extends the AccessExecutionGraph and adds serializability to it. Moreover, this commit changes the OnCompletionActions interface such that it accepts a SerializableExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. commit ca15b076c05ff940a12a240ba385e2434f93790b Author: Till Rohrmann Date: 2018-01-18T14:02:36Z [hotfix] [tests] Let BucketingSink extend TestLogger commit 21c25502fb6d07c6fb65f18100dc6d4ec23e9d93 Author: Till Rohrmann Date: 2018-01-17T14:01:57Z [FLINK-8450] [flip6] Make JobMaster/DispatcherGateway#requestJob type safe Let JobMasterGateway#requestJob and DispatcherGateway#requestJob return a CompletableFuture instead of a CompletableFuture. In order to support the old code and the JobManagerGateway implementation we have to keep the return type in RestfulGateway. Once the old code has been removed, we should change this as well. commit 7b7b0692582189b8e540e5ae022d351c45991e43 Author: Till Rohrmann Date: 2018-01-17T11:22:43Z [FLINK-8453] [flip6] Add SerializableExecutionGraphStore to Dispatcher The SerializableExecutionGraphStore is responsible for storing completed jobs for historic job requests (e.g. from the web ui or from the client). The store is populated by the Dispatcher once a job has terminated. The FileSerializableExecutionGraphStore implementation persists all SerializableExecutionGraphs on disk in order to avoid OOM problems. It only keeps some of the stored graphs in memory until it reaches a configurable size. Once coming close to this size, it will evict the elements and only reload them if requested again. Additionally, the FileSerializableExecutionGraphStore defines an expiration time afte