[ https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340772#comment-16340772 ]
ASF GitHub Bot commented on FLINK-8453: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164062610 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** + * Tests that we can put {@link ArchivedExecutionGraph} into the + * {@link FileArchivedExecutionGraphStore} and that the graph is persisted. + */ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); + } + } + + /** + * Tests that null is returned if we request an unknown JobID. + */ + @Test + public void testUnknownGet() throws IOException { + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); + } + } + + /** + * Tests that we obtain the correct jobs overview. + */ + @Test + public void testStoredJobsOverview() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs); + + final List<JobStatus> jobStatuses = executionGraphs.stream().map(ArchivedExecutionGraph::getState).collect(Collectors.toList()); + + final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); + + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + for (ArchivedExecutionGraph executionGraph : executionGraphs) { + executionGraphStore.put(executionGraph); + } + + assertThat(executionGraphStore.getStoredJobsOverview(), Matchers.equalTo(expectedJobsOverview)); + } + } + + /** + * Tests that we obtain the correct collection of available job details. + */ + @Test + public void testAvailableJobDetails() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs); + + final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList()); + + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + for (ArchivedExecutionGraph executionGraph : executionGraphs) { + executionGraphStore.put(executionGraph); + } + + assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray())); + } + } + + /** + * Tests that an expired execution graph is removed from the execution graph store. + */ + @Test + public void testExecutionGraphExpiration() throws Exception { + final File rootDir = temporaryFolder.newFolder(); + + final Time expirationTime = Time.milliseconds(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = new FileArchivedExecutionGraphStore( + rootDir, + expirationTime, + 10000L, + scheduledExecutor)) { + + final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + + executionGraphStore.put(executionGraph); + + // there should one execution graph + assertThat(executionGraphStore.size(), Matchers.equalTo(1)); + + Thread.sleep(expirationTime.toMilliseconds()); --- End diff -- Will change it. > Add SerializableExecutionGraphStore to Dispatcher > ------------------------------------------------- > > Key: FLINK-8453 > URL: https://issues.apache.org/jira/browse/FLINK-8453 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, REST > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it > can use to store completed jobs. This store can then be used to serve > historic job requests from the web UI, for example. The default > implementation should persist the jobs to disk and evict the in memory > instances once they grow to big in order to avoid memory leaks. Additionally, > the store should expire elements from disk after a user defined time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)