[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5310 ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164087390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.jobDetailsCache = CacheBuilder.newBuilder() + .expireAfterWrite(expirationTime.toMilliseconds(),
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164062610 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); + } + } + + /** +* Tests that null is returned if we request an unknown JobID. +*/ + @Test + public void testUnknownGet() throws IOExce
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164058994 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- Good point. Will change it. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164058010 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- You're right, will change it. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164057896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueue> scheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator> iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- Will remove it. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164057811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.jobDetailsCache = CacheBuilder.newBuilder() + .expireAfterWrite(expirationTime.toMillis
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r164056776 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- Good catch. Will write a proper error message. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919241 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); --- End diff -- With `ThreadLocalRandom.current().nextInt(...)` you already have an available random instance which does not suffer from lock contention problems. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163918048 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); --- End diff -- It looks like a constant, i.e., it shouldn't be mutable. ``` private static final List GLOBALLY_TERMINAL_JOB_STATUS = Collections.unmodifiableList( Arrays.stream(JobStatus.values()) .filter(JobStatus::isGloballyTerminalState) .collect(Collectors.toList())); ``` Using `@BeforeClass` is not idiomatic imo. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163915273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedExecutionGraphCache; + + private final ScheduledFuture cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.jobDetailsCache = CacheBuilder.newBuilder() + .expireAfterWrite(expirationTime.toMilliseconds(),
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163911570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -486,11 +510,22 @@ private void onFatalError(Throwable throwable) { fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), ""); --- End diff -- The `errorMessage` is an empty string. Leave it out completely or put something meaningful. ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163921620 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueue> scheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + final ScheduledTask scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** +* Triggers all registered tasks. +*/ + public void triggerScheduledTasks() { + final Iterator> iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask implements ScheduledFuture { + + private final Callable callable; + + private final boolean isPeriodic; + + private final CompletableFuture result; + + private ScheduledTask(Callable callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public boolean isPeriodic() { --- End diff -- nit: method is unused ---
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163922121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); + } + } + + /** +* Tests that null is returned if we request an unknown JobID. +*/ + @Test + public void testUnknownGet() throws IOException {
[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5310#discussion_r163919921 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List GLOBALLY_TERMINAL_JOB_STATUS = new ArrayList<>(3); + + private static final Random RANDOM = new Random(); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() { + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FINISHED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.FAILED); + GLOBALLY_TERMINAL_JOB_STATUS.add(JobStatus.CANCELED); + } + + /** +* Tests that we can put {@link ArchivedExecutionGraph} into the +* {@link FileArchivedExecutionGraphStore} and that the graph is persisted. +*/ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); --- End diff -- It is not obvious what the matcher is doing. How about: `assertThat(...), isPredicateFulfilled(..))` ``` private static Matcher isPredicateFulfilled(Ar