[FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to Dispatcher The ArchivedExecutionGraphStore is responsible for storing completed jobs for historic job requests (e.g. from the web ui or from the client). The store is populated by the Dispatcher once a job has terminated.
The FileArchivedExecutionGraphStore implementation persists all ArchivedExecutionGraphs on disk in order to avoid OOM problems. It only keeps some of the stored graphs in memory until it reaches a configurable size. Once coming close to this size, it will evict the elements and only reload them if requested again. Additionally, the FileArchivedExecutionGraphStore defines an expiration time after which the execution graphs will be removed from disk. This prevents excessive use of disk resources. This closes #5310. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b817f0f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b817f0f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b817f0f Branch: refs/heads/master Commit: 8b817f0f9f0ec55f040b56f2d65c62761eac1ac1 Parents: 60b7b03 Author: Till Rohrmann <[email protected]> Authored: Fri Jan 19 14:20:03 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 13:50:11 2018 +0100 ---------------------------------------------------------------------- docs/ops/config.md | 4 + .../flink/configuration/JobManagerOptions.java | 17 + .../apache/flink/runtime/blob/BlobUtils.java | 2 +- .../dispatcher/ArchivedExecutionGraphStore.java | 74 ++++ .../flink/runtime/dispatcher/Dispatcher.java | 56 ++- .../FileArchivedExecutionGraphStore.java | 301 +++++++++++++++++ .../dispatcher/StandaloneDispatcher.java | 2 + .../entrypoint/SessionClusterEntrypoint.java | 40 +++ .../flink/runtime/jobmaster/JobMaster.java | 5 +- .../messages/webmonitor/ClusterOverview.java | 51 +-- .../messages/webmonitor/JobsOverview.java | 49 ++- .../ManuallyTriggeredScheduledExecutor.java | 163 +++++++++ .../runtime/dispatcher/DispatcherTest.java | 3 + .../FileArchivedExecutionGraphStoreTest.java | 337 +++++++++++++++++++ .../MemoryArchivedExecutionGraphStore.java | 80 +++++ 15 files changed, 1130 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index 49e0a35..8a4b1c1 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -242,6 +242,10 @@ Default value is `1 minute`. - `restart-strategy.failure-rate.delay`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate". Default value is the `akka.ask.timeout`. +- `jobstore.cache-size`: The job store cache size in bytes which is used to keep completed jobs in memory (DEFAULT: `52428800` (`50` MB)). + +- `jobstore.expiration-time`: The time in seconds after which a completed job expires and is purged from the job store (DEFAULT: `3600`). + ## Full Reference ### HDFS http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 1e22a24..9f61736 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -96,6 +96,23 @@ public class JobManagerOptions { key("jobmanager.archive.fs.dir") .noDefaultValue(); + /** + * The job store cache size in bytes which is used to keep completed + * jobs in memory. + */ + public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE = + key("jobstore.cache-size") + .defaultValue(50L * 1024L * 1024L) + .withDescription("The job store cache size in bytes which is used to keep completed jobs in memory."); + + /** + * The time in seconds after which a completed job expires and is purged from the job store. + */ + public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME = + key("jobstore.expiration-time") + .defaultValue(60L * 60L) + .withDescription("The time in seconds after which a completed job expires and is purged from the job store."); + // --------------------------------------------------------------------------------------------- private JobManagerOptions() { http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 04d3366..3273e1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -306,7 +306,7 @@ public class BlobUtils { /** * Adds a shutdown hook to the JVM and returns the Thread, which has been registered. */ - static Thread addShutdownHook(final Closeable service, final Logger logger) { + public static Thread addShutdownHook(final Closeable service, final Logger logger) { checkNotNull(service); checkNotNull(logger); http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java new file mode 100644 index 0000000..6f5df53 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedExecutionGraphStore.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for a {@link ArchivedExecutionGraph} store. + */ +public interface ArchivedExecutionGraphStore extends Closeable { + + /** + * Returns the current number of stored {@link ArchivedExecutionGraph}. + * + * @return Current number of stored {@link ArchivedExecutionGraph} + */ + int size(); + + /** + * Get the {@link ArchivedExecutionGraph} for the given job id. Null if it isn't stored. + * + * @param jobId identifying the serializable execution graph to retrieve + * @return The stored serializable execution graph or null + */ + @Nullable + ArchivedExecutionGraph get(JobID jobId); + + /** + * Store the given {@link ArchivedExecutionGraph} in the store. + * + * @param archivedExecutionGraph to store + * @throws IOException if the serializable execution graph could not be stored in the store + */ + void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException; + + /** + * Return the {@link JobsOverview} for all stored/past jobs. + * + * @return Jobs overview for all stored/past jobs + */ + JobsOverview getStoredJobsOverview(); + + /** + * Return the collection of {@link JobDetails} of all currently stored jobs. + * + * @return Collection of job details of all currently stored jobs + */ + Collection<JobDetails> getAvailableJobDetails(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index a5c8961..e930450 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -47,6 +46,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.JobExecutionResultGoneException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -102,6 +102,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache(); + private final ArchivedExecutionGraphStore archivedExecutionGraphStore; + @Nullable protected final String restAddress; @@ -114,6 +116,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, + ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { super(rpcService, endpointId); @@ -136,6 +139,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService(); this.restAddress = restAddress; + + this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore); } //------------------------------------------------------ @@ -307,10 +312,15 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme CompletableFuture<Collection<JobStatus>> allJobsFuture = FutureUtils.combineAll(jobStatus); + final JobsOverview completedJobsOverview = archivedExecutionGraphStore.getStoredJobsOverview(); + return allJobsFuture.thenCombine( taskManagerOverviewFuture, - (Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) -> - ClusterOverview.create(resourceOverview, allJobsStatus)); + (Collection<JobStatus> runningJobsStatus, ResourceOverview resourceOverview) -> { + final JobsOverview allJobsOverview = JobsOverview.create(runningJobsStatus).combine(completedJobsOverview); + return new ClusterOverview(resourceOverview, allJobsOverview); + }); + } @Override @@ -325,9 +335,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme CompletableFuture<Collection<JobDetails>> combinedJobDetails = FutureUtils.combineAll(individualJobDetails); + final Collection<JobDetails> completedJobDetails = archivedExecutionGraphStore.getAvailableJobDetails(); + return combinedJobDetails.thenApply( - (Collection<JobDetails> jobDetails) -> - new MultipleJobsDetails(jobDetails)); + (Collection<JobDetails> runningJobDetails) -> { + final Collection<JobDetails> allJobDetails = new ArrayList<>(completedJobDetails.size() + runningJobDetails.size()); + allJobDetails.addAll(runningJobDetails); + allJobDetails.addAll(completedJobDetails); + return new MultipleJobsDetails(allJobDetails); + }); + } @Override @@ -335,7 +352,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner == null) { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId); + + // check whether it is a completed job + if (serializableExecutionGraph == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(serializableExecutionGraph); + } } else { return jobManagerRunner.getJobManagerGateway().requestJob(jobId, timeout); } @@ -486,11 +510,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme fatalErrorHandler.onFatalError(throwable); } - private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { - final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + private void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { + Preconditions.checkArgument( + archivedExecutionGraph.getState().isGloballyTerminalState(), + "Job " + archivedExecutionGraph.getJobID() + " is in state " + + archivedExecutionGraph.getState() + " which is not globally terminal."); + final JobResult jobResult = JobResult.createFrom(archivedExecutionGraph); jobExecutionResultCache.put(jobResult); - final JobID jobId = accessExecutionGraph.getJobID(); + final JobID jobId = archivedExecutionGraph.getJobID(); + + try { + archivedExecutionGraphStore.put(archivedExecutionGraph); + } catch (IOException e) { + log.info( + "Could not store completed job {}({}).", + archivedExecutionGraph.getJobName(), + archivedExecutionGraph.getJobID(), + e); + } try { removeJob(jobId, true); http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java new file mode 100644 index 0000000..8db4fac --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedExecutionGraph}. The store writes the archived execution graph to disk + * and keeps the most recently used execution graphs in a memory cache for faster serving. Moreover, + * the stored execution graphs are periodically cleaned up. + */ +public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class); + + private final File storageDir; + + private final Cache<JobID, JobDetails> jobDetailsCache; + + private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache; + + private final ScheduledFuture<?> cleanupFuture; + + private final Thread shutdownHook; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedExecutionGraphStore( + File rootDir, + Time expirationTime, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor, + Ticker ticker) throws IOException { + + final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedExecutionGraphStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMilliseconds(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.jobDetailsCache = CacheBuilder.newBuilder() + .expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS) + .removalListener( + (RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey())) + .ticker(ticker) + .build(); + + this.archivedExecutionGraphCache = CacheBuilder.newBuilder() + .maximumWeight(maximumCacheSizeBytes) + .weigher(this::calculateSize) + .build(new CacheLoader<JobID, ArchivedExecutionGraph>() { + @Override + public ArchivedExecutionGraph load(JobID jobId) throws Exception { + return loadExecutionGraph(jobId); + }}); + + this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay( + jobDetailsCache::cleanUp, + expirationTime.toMilliseconds(), + expirationTime.toMilliseconds(), + TimeUnit.MILLISECONDS); + + this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + + this.numFinishedJobs = 0; + this.numFailedJobs = 0; + this.numCanceledJobs = 0; + } + + @Override + public int size() { + return Math.toIntExact(jobDetailsCache.size()); + } + + @Override + @Nullable + public ArchivedExecutionGraph get(JobID jobId) { + try { + return archivedExecutionGraphCache.get(jobId); + } catch (ExecutionException e) { + LOG.debug("Could not load archived execution graph for job id {}.", jobId, e); + return null; + } + } + + @Override + public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException { + final JobStatus jobStatus = archivedExecutionGraph.getState(); + final JobID jobId = archivedExecutionGraph.getJobID(); + final String jobName = archivedExecutionGraph.getJobName(); + + Preconditions.checkArgument( + jobStatus.isGloballyTerminalState(), + "The job " + jobName + '(' + jobId + + ") is not in a globally terminal state. Instead it is in state " + jobStatus + '.'); + + switch (jobStatus) { + case FINISHED: + numFinishedJobs++; + break; + case CANCELED: + numCanceledJobs++; + break; + case FAILED: + numFailedJobs++; + break; + default: + throw new IllegalStateException("The job " + jobName + '(' + + jobId + ") should have been in a globally terminal state. " + + "Instead it was in state " + jobStatus + '.'); + } + + // write the ArchivedExecutionGraph to disk + storeArchivedExecutionGraph(archivedExecutionGraph); + + final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph); + + jobDetailsCache.put(jobId, detailsForJob); + archivedExecutionGraphCache.put(jobId, archivedExecutionGraph); + } + + @Override + public JobsOverview getStoredJobsOverview() { + return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs); + } + + @Override + public Collection<JobDetails> getAvailableJobDetails() { + return jobDetailsCache.asMap().values(); + } + + @Override + public void close() throws IOException { + cleanupFuture.cancel(false); + + jobDetailsCache.invalidateAll(); + + // clean up the storage directory + FileUtils.deleteFileOrDirectory(storageDir); + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the + // shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown hook.", t); + } + } + } + + // -------------------------------------------------------------- + // Internal methods + // -------------------------------------------------------------- + + private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) { + final File archivedExecutionGraphFile = getExecutionGraphFile(jobId); + + if (archivedExecutionGraphFile.exists()) { + return Math.toIntExact(archivedExecutionGraphFile.length()); + } else { + LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId); + return serializableExecutionGraph.getAllVertices().size() * 1000 + + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000; + } + } + + private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException { + final File archivedExecutionGraphFile = getExecutionGraphFile(jobId); + + if (archivedExecutionGraphFile.exists()) { + try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) { + return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader()); + } + } else { + throw new FileNotFoundException("Could not find file for archived execution graph " + jobId + + ". This indicates that the file either has been deleted or never written."); + } + } + + private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException { + final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID()); + + try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) { + InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph); + } + } + + private File getExecutionGraphFile(JobID jobId) { + return new File(storageDir, jobId.toString()); + } + + private void deleteExecutionGraphFile(JobID jobId) { + Preconditions.checkNotNull(jobId); + + final File archivedExecutionGraphFile = getExecutionGraphFile(jobId); + + try { + FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile); + } catch (IOException e) { + LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e); + } + + archivedExecutionGraphCache.invalidate(jobId); + jobDetailsCache.invalidate(jobId); + } + + private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException { + final int maxAttempts = 10; + + for (int attempt = 0; attempt < maxAttempts; attempt++) { + final File storageDirectory = new File(tmpDir, "executionGraphStore-" + UUID.randomUUID()); + + if (storageDirectory.mkdir()) { + return storageDirectory; + } + } + + throw new IOException("Could not create executionGraphStorage directory in " + tmpDir + '.'); + } + + // -------------------------------------------------------------- + // Testing methods + // -------------------------------------------------------------- + + @VisibleForTesting + File getStorageDir() { + return storageDir; + } + + @VisibleForTesting + LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() { + return archivedExecutionGraphCache; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java index c64c883..b71b6bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java @@ -50,6 +50,7 @@ public class StandaloneDispatcher extends Dispatcher { BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, + ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { super( @@ -61,6 +62,7 @@ public class StandaloneDispatcher extends Dispatcher { blobServer, heartbeatServices, metricRegistry, + archivedExecutionGraphStore, fatalErrorHandler, restAddress); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 0628281..0b1cea0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -20,13 +20,18 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; +import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -47,10 +52,14 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.shaded.guava18.com.google.common.base.Ticker; + import akka.actor.ActorSystem; import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; import java.util.concurrent.Executor; /** @@ -68,6 +77,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { private DispatcherRestEndpoint dispatcherRestEndpoint; + private ArchivedExecutionGraphStore archivedExecutionGraphStore; + public SessionClusterEntrypoint(Configuration configuration) { super(configuration); } @@ -81,6 +92,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { + archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, rpcService.getScheduledExecutor()); + dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); @@ -131,6 +144,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { blobServer, heartbeatServices, metricRegistry, + archivedExecutionGraphStore, this, dispatcherRestEndpoint.getRestAddress()); @@ -143,6 +157,22 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); } + private ArchivedExecutionGraphStore createSerializableExecutionGraphStore( + Configuration configuration, + ScheduledExecutor scheduledExecutor) throws IOException { + final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]); + + final Time expirationTime = Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME)); + final long maximumCacheSizeBytes = configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE); + + return new FileArchivedExecutionGraphStore( + tmpDir, + expirationTime, + maximumCacheSizeBytes, + scheduledExecutor, + Ticker.systemTicker()); + } + @Override protected void stopClusterComponents(boolean cleanupHaData) throws Exception { Throwable exception = null; @@ -183,6 +213,14 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { } } + if (archivedExecutionGraphStore != null) { + try { + archivedExecutionGraphStore.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + if (exception != null) { throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); } @@ -215,6 +253,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, + ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { @@ -228,6 +267,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { blobServer, heartbeatServices, metricRegistry, + archivedExecutionGraphStore, fatalErrorHandler, restAddress); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0c8ee16..6d22e75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -72,6 +72,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -824,7 +825,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast Collection<JobStatus> jobStatuses = Collections.singleton(executionGraph.getState()); return resourceOverviewFuture.thenApply( - (ResourceOverview resourceOverview) -> ClusterOverview.create(resourceOverview, jobStatuses)); + (ResourceOverview resourceOverview) -> new ClusterOverview( + resourceOverview, + JobsOverview.create(jobStatuses))); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java index 69b45b5..4eedbc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java @@ -18,15 +18,11 @@ package org.apache.flink.runtime.messages.webmonitor; -import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.resourcemanager.ResourceOverview; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Collection; - /** * Response to the {@link RequestStatusOverview} message, carrying a description * of the Flink cluster status. @@ -73,6 +69,17 @@ public class ClusterOverview extends JobsOverview { this.numSlotsAvailable = numSlotsAvailable; } + public ClusterOverview(ResourceOverview resourceOverview, JobsOverview jobsOverview) { + this( + resourceOverview.getNumberTaskManagers(), + resourceOverview.getNumberRegisteredSlots(), + resourceOverview.getNumberFreeSlots(), + jobsOverview.getNumJobsRunningOrPending(), + jobsOverview.getNumJobsFinished(), + jobsOverview.getNumJobsCancelled(), + jobsOverview.getNumJobsFailed()); + } + public int getNumTaskManagersConnected() { return numTaskManagersConnected; } @@ -128,40 +135,4 @@ public class ClusterOverview extends JobsOverview { ", numJobsFailed=" + getNumJobsFailed() + '}'; } - - public static ClusterOverview create(ResourceOverview resourceOverview, Collection<JobStatus> allJobsStatus) { - Preconditions.checkNotNull(resourceOverview); - Preconditions.checkNotNull(allJobsStatus); - - int numberRunningOrPendingJobs = 0; - int numberFinishedJobs = 0; - int numberCancelledJobs = 0; - int numberFailedJobs = 0; - - for (JobStatus status : allJobsStatus) { - switch (status) { - case FINISHED: - numberFinishedJobs++; - break; - case FAILED: - numberFailedJobs++; - break; - case CANCELED: - numberCancelledJobs++; - break; - default: - numberRunningOrPendingJobs++; - break; - } - } - - return new ClusterOverview( - resourceOverview.getNumberTaskManagers(), - resourceOverview.getNumberRegisteredSlots(), - resourceOverview.getNumberFreeSlots(), - numberRunningOrPendingJobs, - numberFinishedJobs, - numberCancelledJobs, - numberFailedJobs); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java index 3834a76..1d65245 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsOverview.java @@ -18,9 +18,14 @@ package org.apache.flink.runtime.messages.webmonitor; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.Preconditions; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Collection; + /** * An overview of how many jobs are in which status. */ @@ -51,7 +56,7 @@ public class JobsOverview implements InfoMessage { @JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished, @JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled, @JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed) { - + this.numJobsRunningOrPending = numJobsRunningOrPending; this.numJobsFinished = numJobsFinished; this.numJobsCancelled = numJobsCancelled; @@ -80,9 +85,9 @@ public class JobsOverview implements InfoMessage { public int getNumJobsFailed() { return numJobsFailed; } - + // ------------------------------------------------------------------------ - + @Override public boolean equals(Object obj) { if (this == obj) { @@ -118,4 +123,42 @@ public class JobsOverview implements InfoMessage { ", numJobsFailed=" + numJobsFailed + '}'; } + + /** + * Combines the given jobs overview with this. + * + * @param jobsOverview to combine with this + * @return Combined jobs overview + */ + public JobsOverview combine(JobsOverview jobsOverview) { + return new JobsOverview(this, jobsOverview); + } + + public static JobsOverview create(Collection<JobStatus> allJobsStatus) { + Preconditions.checkNotNull(allJobsStatus); + + int numberRunningOrPendingJobs = 0; + int numberFinishedJobs = 0; + int numberCancelledJobs = 0; + int numberFailedJobs = 0; + + for (JobStatus status : allJobsStatus) { + switch (status) { + case FINISHED: + numberFinishedJobs++; + break; + case FAILED: + numberFailedJobs++; + break; + case CANCELED: + numberCancelledJobs++; + break; + default: + numberRunningOrPendingJobs++; + break; + } + } + + return new JobsOverview(numberRunningOrPendingJobs, numberFinishedJobs, numberCancelledJobs, numberFailedJobs); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java new file mode 100644 index 0000000..1fc7705 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor; +import org.apache.flink.util.Preconditions; + +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Simple {@link ScheduledExecutor} implementation for testing purposes. + */ +public class ManuallyTriggeredScheduledExecutor extends ManuallyTriggeredDirectExecutor implements ScheduledExecutor { + + private final ConcurrentLinkedQueue<ScheduledTask<?>> scheduledTasks = new ConcurrentLinkedQueue<>(); + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return insertRunnable(command, false); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + final ScheduledTask<V> scheduledTask = new ScheduledTask<>(callable, false); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return insertRunnable(command, true); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return insertRunnable(command, true); + } + + /** + * Triggers all registered tasks. + */ + public void triggerScheduledTasks() { + final Iterator<ScheduledTask<?>> iterator = scheduledTasks.iterator(); + + while (iterator.hasNext()) { + final ScheduledTask<?> scheduledTask = iterator.next(); + + scheduledTask.execute(); + + if (!scheduledTask.isPeriodic) { + iterator.remove(); + } + } + } + + private ScheduledFuture<?> insertRunnable(Runnable command, boolean isPeriodic) { + final ScheduledTask<?> scheduledTask = new ScheduledTask<>( + () -> { + command.run(); + return null; + }, + isPeriodic); + + scheduledTasks.offer(scheduledTask); + + return scheduledTask; + } + + private static final class ScheduledTask<T> implements ScheduledFuture<T> { + + private final Callable<T> callable; + + private final boolean isPeriodic; + + private final CompletableFuture<T> result; + + private ScheduledTask(Callable<T> callable, boolean isPeriodic) { + this.callable = Preconditions.checkNotNull(callable); + this.isPeriodic = isPeriodic; + + this.result = new CompletableFuture<>(); + } + + public void execute() { + if (!result.isDone()) { + if (!isPeriodic) { + try { + result.complete(callable.call()); + } catch (Exception e) { + result.completeExceptionally(e); + } + } else { + try { + callable.call(); + } catch (Exception e) { + result.completeExceptionally(e); + } + } + } + } + + @Override + public long getDelay(TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(Delayed o) { + return 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return result.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return result.isCancelled(); + } + + @Override + public boolean isDone() { + return result.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return result.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return result.get(timeout, unit); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 76a3c98..c9b9bfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -173,6 +173,7 @@ public class DispatcherTest extends TestLogger { new BlobServer(blobServerConfig, new VoidBlobStore()), heartbeatServices, NoOpMetricRegistry.INSTANCE, + new MemoryArchivedExecutionGraphStore(), fatalErrorHandler, TEST_JOB_ID); @@ -344,6 +345,7 @@ public class DispatcherTest extends TestLogger { BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, + ArchivedExecutionGraphStore archivedExecutionGraphStore, FatalErrorHandler fatalErrorHandler, JobID expectedJobId) throws Exception { super( @@ -355,6 +357,7 @@ public class DispatcherTest extends TestLogger { blobServer, heartbeatServices, metricRegistry, + archivedExecutionGraphStore, fatalErrorHandler, null); http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java new file mode 100644 index 0000000..e6d700e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FileArchivedExecutionGraphStore}. + */ +@Category(Flip6.class) +public class FileArchivedExecutionGraphStoreTest extends TestLogger { + + private static final List<JobStatus> GLOBALLY_TERMINAL_JOB_STATUS = Arrays.stream(JobStatus.values()) + .filter(JobStatus::isGloballyTerminalState) + .collect(Collectors.toList()); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Tests that we can put {@link ArchivedExecutionGraph} into the + * {@link FileArchivedExecutionGraphStore} and that the graph is persisted. + */ + @Test + public void testPut() throws IOException { + final ArchivedExecutionGraph dummyExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the storage directory is empty + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(dummyExecutionGraph); + + // check that we have persisted the given execution graph + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + + assertThat(executionGraphStore.get(dummyExecutionGraph.getJobID()), new PartialArchivedExecutionGraphMatcher(dummyExecutionGraph)); + } + } + + /** + * Tests that null is returned if we request an unknown JobID. + */ + @Test + public void testUnknownGet() throws IOException { + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); + } + } + + /** + * Tests that we obtain the correct jobs overview. + */ + @Test + public void testStoredJobsOverview() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs); + + final List<JobStatus> jobStatuses = executionGraphs.stream().map(ArchivedExecutionGraph::getState).collect(Collectors.toList()); + + final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); + + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + for (ArchivedExecutionGraph executionGraph : executionGraphs) { + executionGraphStore.put(executionGraph); + } + + assertThat(executionGraphStore.getStoredJobsOverview(), Matchers.equalTo(expectedJobsOverview)); + } + } + + /** + * Tests that we obtain the correct collection of available job details. + */ + @Test + public void testAvailableJobDetails() throws IOException { + final int numberExecutionGraphs = 10; + final Collection<ArchivedExecutionGraph> executionGraphs = generateTerminalExecutionGraphs(numberExecutionGraphs); + + final Collection<JobDetails> jobDetails = executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList()); + + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + for (ArchivedExecutionGraph executionGraph : executionGraphs) { + executionGraphStore.put(executionGraph); + } + + assertThat(executionGraphStore.getAvailableJobDetails(), Matchers.containsInAnyOrder(jobDetails.toArray())); + } + } + + /** + * Tests that an expired execution graph is removed from the execution graph store. + */ + @Test + public void testExecutionGraphExpiration() throws Exception { + final File rootDir = temporaryFolder.newFolder(); + + final Time expirationTime = Time.milliseconds(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor(); + + final ManualTicker manualTicker = new ManualTicker(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = new FileArchivedExecutionGraphStore( + rootDir, + expirationTime, + 10000L, + scheduledExecutor, + manualTicker)) { + + final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + + executionGraphStore.put(executionGraph); + + // there should one execution graph + assertThat(executionGraphStore.size(), Matchers.equalTo(1)); + + manualTicker.advanceTime(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS); + + // this should trigger the cleanup after expiration + scheduledExecutor.triggerScheduledTasks(); + + assertThat(executionGraphStore.size(), Matchers.equalTo(0)); + + assertThat(executionGraphStore.get(executionGraph.getJobID()), Matchers.nullValue()); + + final File storageDirectory = executionGraphStore.getStorageDir(); + + // check that the persisted file has been deleted + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + } + } + + /** + * Tests that all persisted files are cleaned up after closing the store. + */ + @Test + public void testCloseCleansUp() throws IOException { + final File rootDir = temporaryFolder.newFolder(); + + assertThat(rootDir.listFiles().length, Matchers.equalTo(0)); + + try (final FileArchivedExecutionGraphStore executionGraphStore = createDefaultExecutionGraphStore(rootDir)) { + + assertThat(rootDir.listFiles().length, Matchers.equalTo(1)); + + final File storageDirectory = executionGraphStore.getStorageDir(); + + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); + + executionGraphStore.put(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build()); + + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); + } + + assertThat(rootDir.listFiles().length, Matchers.equalTo(0)); + } + + /** + * Tests that evicted {@link ArchivedExecutionGraph} are loaded from disk again. + */ + @Test + public void testCacheLoading() throws IOException { + final File rootDir = temporaryFolder.newFolder(); + + try (final FileArchivedExecutionGraphStore executionGraphStore = new FileArchivedExecutionGraphStore( + rootDir, + Time.hours(1L), + 100L << 10, + TestingUtils.defaultScheduledExecutor(), + Ticker.systemTicker())) { + + final LoadingCache<JobID, ArchivedExecutionGraph> executionGraphCache = executionGraphStore.getArchivedExecutionGraphCache(); + + Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(64); + + boolean continueInserting = true; + + // insert execution graphs until the first one got evicted + while (continueInserting) { + // has roughly a size of 1.4 KB + final ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); + + executionGraphStore.put(executionGraph); + + executionGraphs.add(executionGraph); + + continueInserting = executionGraphCache.size() == executionGraphs.size(); + } + + final File storageDirectory = executionGraphStore.getStorageDir(); + + assertThat(storageDirectory.listFiles().length, Matchers.equalTo(executionGraphs.size())); + + for (ArchivedExecutionGraph executionGraph : executionGraphs) { + assertThat(executionGraphStore.get(executionGraph.getJobID()), matchesPartiallyWith(executionGraph)); + } + } + } + + private Collection<ArchivedExecutionGraph> generateTerminalExecutionGraphs(int number) { + final Collection<ArchivedExecutionGraph> executionGraphs = new ArrayList<>(number); + + for (int i = 0; i < number; i++) { + final JobStatus state = GLOBALLY_TERMINAL_JOB_STATUS.get(ThreadLocalRandom.current().nextInt(GLOBALLY_TERMINAL_JOB_STATUS.size())); + executionGraphs.add( + new ArchivedExecutionGraphBuilder() + .setState(state) + .build()); + } + + return executionGraphs; + } + + private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File storageDirectory) throws IOException { + return new FileArchivedExecutionGraphStore( + storageDirectory, + Time.hours(1L), + 10000L, + TestingUtils.defaultScheduledExecutor(), + Ticker.systemTicker()); + } + + private static final class ManualTicker extends Ticker { + + private long currentTime = 0; + + @Override + public long read() { + return currentTime; + } + + void advanceTime(long duration, TimeUnit timeUnit) { + currentTime += timeUnit.toNanos(duration); + } + } + + private static final class PartialArchivedExecutionGraphMatcher extends BaseMatcher<ArchivedExecutionGraph> { + + private final ArchivedExecutionGraph archivedExecutionGraph; + + private PartialArchivedExecutionGraphMatcher(ArchivedExecutionGraph expectedArchivedExecutionGraph) { + this.archivedExecutionGraph = Preconditions.checkNotNull(expectedArchivedExecutionGraph); + } + + @Override + public boolean matches(Object o) { + if (archivedExecutionGraph == o) { + return true; + } + if (o == null || archivedExecutionGraph.getClass() != o.getClass()) { + return false; + } + ArchivedExecutionGraph that = (ArchivedExecutionGraph) o; + return archivedExecutionGraph.isStoppable() == that.isStoppable() && + Objects.equals(archivedExecutionGraph.getJobID(), that.getJobID()) && + Objects.equals(archivedExecutionGraph.getJobName(), that.getJobName()) && + archivedExecutionGraph.getState() == that.getState() && + Objects.equals(archivedExecutionGraph.getJsonPlan(), that.getJsonPlan()) && + Objects.equals(archivedExecutionGraph.getAccumulatorsSerialized(), that.getAccumulatorsSerialized()) && + Objects.equals(archivedExecutionGraph.getCheckpointCoordinatorConfiguration(), that.getCheckpointCoordinatorConfiguration()) && + archivedExecutionGraph.getAllVertices().size() == that.getAllVertices().size(); + } + + @Override + public void describeTo(Description description) { + description.appendText("Matches against " + ArchivedExecutionGraph.class.getSimpleName() + '.'); + } + } + + private static Matcher<ArchivedExecutionGraph> matchesPartiallyWith(ArchivedExecutionGraph executionGraph) { + return new PartialArchivedExecutionGraphMatcher(executionGraph); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b817f0f/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java new file mode 100644 index 0000000..9bfdbb3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedExecutionGraphStore.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * {@link ArchivedExecutionGraphStore} implementation which stores the {@link ArchivedExecutionGraph} + * in memory. + */ +public class MemoryArchivedExecutionGraphStore implements ArchivedExecutionGraphStore { + + private final Map<JobID, ArchivedExecutionGraph> serializableExecutionGraphs = new HashMap<>(4); + + @Override + public int size() { + return serializableExecutionGraphs.size(); + } + + @Nullable + @Override + public ArchivedExecutionGraph get(JobID jobId) { + return serializableExecutionGraphs.get(jobId); + } + + @Override + public void put(ArchivedExecutionGraph serializableExecutionGraph) throws IOException { + serializableExecutionGraphs.put(serializableExecutionGraph.getJobID(), serializableExecutionGraph); + } + + @Override + public JobsOverview getStoredJobsOverview() { + Collection<JobStatus> allJobStatus = serializableExecutionGraphs.values().stream() + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); + + return JobsOverview.create(allJobStatus); + } + + @Override + public Collection<JobDetails> getAvailableJobDetails() { + return serializableExecutionGraphs.values().stream() + .map(WebMonitorUtils::createDetailsForJob) + .collect(Collectors.toList()); + } + + @Override + public void close() throws IOException { + serializableExecutionGraphs.clear(); + } +}
