[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339571#comment-16339571
 ] 

ASF GitHub Bot commented on FLINK-8453:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5310#discussion_r163915273
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.runtime.blob.BlobUtils;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.webmonitor.JobDetails;
    +import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
    +import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
    +import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
    +import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Store for {@link ArchivedExecutionGraph}. The store writes the archived 
execution graph to disk
    + * and keeps the most recently used execution graphs in a memory cache for 
faster serving. Moreover,
    + * the stored execution graphs are periodically cleaned up.
    + */
    +public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphStore {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
    +
    +   private final File storageDir;
    +
    +   private final Cache<JobID, JobDetails> jobDetailsCache;
    +
    +   private final LoadingCache<JobID, ArchivedExecutionGraph> 
archivedExecutionGraphCache;
    +
    +   private final ScheduledFuture<?> cleanupFuture;
    +
    +   private final Thread shutdownHook;
    +
    +   private int numFinishedJobs;
    +
    +   private int numFailedJobs;
    +
    +   private int numCanceledJobs;
    +
    +   public FileArchivedExecutionGraphStore(
    +                   File rootDir,
    +                   Time expirationTime,
    +                   long maximumCacheSizeBytes,
    +                   ScheduledExecutor scheduledExecutor) throws IOException 
{
    +
    +           final File storageDirectory = 
initExecutionGraphStorageDirectory(rootDir);
    +
    +           LOG.info(
    +                   "Initializing {}: Storage directory {}, expiration time 
{}, maximum cache size {} bytes.",
    +                   FileArchivedExecutionGraphStore.class.getSimpleName(),
    +                   storageDirectory,
    +                   expirationTime.toMilliseconds(),
    +                   maximumCacheSizeBytes);
    +
    +           this.storageDir = Preconditions.checkNotNull(storageDirectory);
    +           Preconditions.checkArgument(
    +                   storageDirectory.exists() && 
storageDirectory.isDirectory(),
    +                   "The storage directory must exist and be a directory.");
    +           this.jobDetailsCache = CacheBuilder.newBuilder()
    +                   .expireAfterWrite(expirationTime.toMilliseconds(), 
TimeUnit.MILLISECONDS)
    +                   .removalListener(
    +                           (RemovalListener<JobID, JobDetails>) 
notification -> deleteExecutionGraphFile(notification.getKey()))
    +                   .build();
    +
    +           this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
    +                   .maximumWeight(maximumCacheSizeBytes)
    +                   .weigher(this::calculateSize)
    +                   .build(new CacheLoader<JobID, ArchivedExecutionGraph>() 
{
    +                           @Override
    +                           public ArchivedExecutionGraph load(JobID jobId) 
throws Exception {
    +                                   return loadExecutionGraph(jobId);
    +                           }});
    +
    +           this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
    +                   jobDetailsCache::cleanUp,
    +                   expirationTime.toMilliseconds(),
    +                   expirationTime.toMilliseconds(),
    +                   TimeUnit.MILLISECONDS);
    +
    +           this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    +
    +           this.numFinishedJobs = 0;
    +           this.numFailedJobs = 0;
    +           this.numCanceledJobs = 0;
    +   }
    +
    +   @Override
    +   public int size() {
    +           return Math.toIntExact(jobDetailsCache.size());
    +   }
    +
    +   @Override
    +   @Nullable
    +   public ArchivedExecutionGraph get(JobID jobId) {
    +           try {
    +                   return archivedExecutionGraphCache.get(jobId);
    +           } catch (ExecutionException e) {
    +                   LOG.debug("Could not load archived execution graph for 
job id {}.", jobId, e);
    +                   return null;
    +           }
    +   }
    +
    +   @Override
    +   public void put(ArchivedExecutionGraph archivedExecutionGraph) throws 
IOException {
    +           final JobStatus jobStatus = archivedExecutionGraph.getState();
    +           final JobID jobId = archivedExecutionGraph.getJobID();
    +           final String jobName = archivedExecutionGraph.getJobName();
    +
    +           Preconditions.checkArgument(
    +                   jobStatus.isGloballyTerminalState(),
    +                   "The job " + jobName + '(' + jobId +
    +                           ") is not in a globally terminal state. Instead 
it is in state " + jobStatus + '.');
    +
    +           switch (jobStatus) {
    +                   case FINISHED:
    +                           numFinishedJobs++;
    +                           break;
    +                   case CANCELED:
    +                           numCanceledJobs++;
    +                           break;
    +                   case FAILED:
    +                           numFailedJobs++;
    +                           break;
    +                   default:
    +                           throw new IllegalStateException("The job " + 
jobName + '(' +
    +                                   jobId + ") should have been in a 
globally terminal state. " +
    +                                   "Instead it was in state " + jobStatus 
+ '.');
    +           }
    +
    +           // write the ArchivedExecutionGraph to disk
    +           storeArchivedExecutionGraph(archivedExecutionGraph);
    +
    +           final JobDetails detailsForJob = 
WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
    +
    +           jobDetailsCache.put(jobId, detailsForJob);
    +           archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    +   }
    +
    +   @Override
    +   public JobsOverview getStoredJobsOverview() {
    +           return new JobsOverview(0, numFinishedJobs, numCanceledJobs, 
numFailedJobs);
    +   }
    +
    +   @Override
    +   public Collection<JobDetails> getAvailableJobDetails() {
    +           return jobDetailsCache.asMap().values();
    +   }
    +
    +   @Override
    +   public void close() throws IOException {
    +           cleanupFuture.cancel(false);
    +
    +           jobDetailsCache.invalidateAll();
    +
    +           // clean up the storage directory
    +           FileUtils.deleteFileOrDirectory(storageDir);
    +
    +           // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the
    +           // shutdown hook itself
    +           if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
    +                   try {
    +                           
Runtime.getRuntime().removeShutdownHook(shutdownHook);
    +                   }
    +                   catch (IllegalStateException e) {
    +                           // race, JVM is in shutdown already, we can 
safely ignore this
    +                   }
    +                   catch (Throwable t) {
    +                           LOG.warn("Exception while unregistering 
FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
    +                   }
    +           }
    +   }
    +
    +   // --------------------------------------------------------------
    +   // Internal methods
    +   // --------------------------------------------------------------
    +
    +   private int calculateSize(JobID jobId, ArchivedExecutionGraph 
serializableExecutionGraph) {
    +           final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
    +
    +           if (archivedExecutionGraphFile.exists()) {
    +                   return 
Math.toIntExact(archivedExecutionGraphFile.length());
    +           } else {
    +                   LOG.debug("Could not find archived execution graph file 
for {}. Estimating the size instead.", jobId);
    +                   return 
serializableExecutionGraph.getAllVertices().size() * 1000 +
    +                           
serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
    +           }
    +   }
    +
    +   private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws 
IOException, ClassNotFoundException {
    +           final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
    +
    +           if (archivedExecutionGraphFile.exists()) {
    +                   try (FileInputStream fileInputStream = new 
FileInputStream(archivedExecutionGraphFile)) {
    +                           return 
InstantiationUtil.deserializeObject(fileInputStream, 
getClass().getClassLoader());
    +                   }
    +           } else {
    +                   throw new FileNotFoundException("Could not find file 
for archived execution graph " + jobId +
    +                           ". This indicates that the file either has been 
deleted or never written.");
    +           }
    +   }
    +
    +   private void storeArchivedExecutionGraph(ArchivedExecutionGraph 
archivedExecutionGraph) throws IOException {
    +           final File archivedExecutionGraphFile = 
getExecutionGraphFile(archivedExecutionGraph.getJobID());
    +
    +           try (FileOutputStream fileOutputStream = new 
FileOutputStream(archivedExecutionGraphFile)) {
    +                   InstantiationUtil.serializeObject(fileOutputStream, 
archivedExecutionGraph);
    +           }
    +   }
    +
    +   private File getExecutionGraphFile(JobID jobId) {
    +           return new File(storageDir, jobId.toString());
    +   }
    +
    +   private void deleteExecutionGraphFile(JobID jobId) {
    +           Preconditions.checkNotNull(jobId);
    +
    +           final File archivedExecutionGraphFile = 
getExecutionGraphFile(jobId);
    +
    +           try {
    +                   
FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
    +           } catch (IOException e) {
    +                   LOG.debug("Could not delete file {}.", 
archivedExecutionGraphFile, e);
    +           }
    +
    +           archivedExecutionGraphCache.invalidate(jobId);
    +           jobDetailsCache.invalidate(jobId);
    +   }
    +
    +   private static File initExecutionGraphStorageDirectory(File tmpDir) 
throws IOException {
    +           final int maxAttempts = 10;
    +
    +           for (int attempt = 0; attempt < maxAttempts; attempt++) {
    --- End diff --
    
    Why do you expect it to fail sometimes?


> Add SerializableExecutionGraphStore to Dispatcher
> -------------------------------------------------
>
>                 Key: FLINK-8453
>                 URL: https://issues.apache.org/jira/browse/FLINK-8453
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination, REST
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to