[ 
https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326411#comment-14326411
 ] 

ASF GitHub Bot commented on FLINK-1483:
---------------------------------------

Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/417#discussion_r24931138
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 ---
    @@ -52,26 +57,88 @@
        /**
         * Constructs a new IOManager.
         *
    -    * @param paths
    -    *        the basic directory paths for files underlying anonymous 
channels.
    +    * @param tempDirs The basic directories for files underlying anonymous 
channels.
         */
    -   protected IOManager(String[] paths) {
    -           this.paths = paths;
    +   protected IOManager(String[] tempDirs) {
    +           if (tempDirs == null || tempDirs.length == 0) {
    +                   throw new IllegalArgumentException("The temporary 
directories must not be null or empty.");
    +           }
    +
                this.random = new Random();
                this.nextPath = 0;
    +
    +           this.paths = new File[tempDirs.length];
    +           for (int i = 0; i < tempDirs.length; i++) {
    +                   File baseDir = new File(tempDirs[i]);
    +                   String subfolder = String.format("flink-io-%s", 
UUID.randomUUID().toString());
    +                   File storageDir = new File(baseDir, subfolder);
    +
    +                   if (!storageDir.exists() && !storageDir.mkdirs()) {
    +                           throw new RuntimeException(
    +                                           "Could not create storage 
directory for IOManager: " + storageDir.getAbsolutePath());
    +                   }
    +                   paths[i] = storageDir;
    +                   LOG.info("I/O manager uses directory {} for spill 
files.", storageDir.getAbsolutePath());
    +           }
    +
    +           this.shutdownHook = new Thread("I/O manager shutdown hook") {
    +                   @Override
    +                   public void run() {
    +                           shutdown();
    +                   }
    +           };
    +           Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        }
     
        /**
    -    * Close method, marks the I/O manager as closed.
    +    * Close method, marks the I/O manager as closed
    +    * and removed all temporary files.
         */
    -   public abstract void shutdown();
    +   public void shutdown() {
    +           // remove all of our temp directories
    +           for (File path : paths) {
    +                   try {
    +                           if (path != null) {
    +                                   if (path.exists()) {
    +                                           FileUtils.deleteDirectory(path);
    +                                           LOG.info("I/O manager removed 
spill file directory {}", path.getAbsolutePath());
    +                                   }
    +                           }
    +                   } catch (Throwable t) {
    +                           LOG.error("IOManager failed to properly clean 
up temp file directory: " + path, t);
    +                   }
    +           }
    +
    +           // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the shutdown hook itself
    +           if (shutdownHook != Thread.currentThread()) {
    +                   try {
    +                           
Runtime.getRuntime().removeShutdownHook(shutdownHook);
    +                   }
    +                   catch (IllegalStateException e) {
    +                           // race, JVM is in shutdown already, we can 
safely ignore this
    +                   }
    +                   catch (Throwable t) {
    +                           LOG.warn("Exception while unregistering 
IOManager's shutdown hook.", t);
    +                   }
    +           }
    +   }
     
        /**
         * Utility method to check whether the IO manager has been properly 
shut down.
    +    * For this base implementation, this means that all files have been 
removed.
         *
         * @return True, if the IO manager has properly shut down, false 
otherwise.
         */
    -   public abstract boolean isProperlyShutDown();
    +   public boolean isProperlyShutDown() {
    +           for (File path : paths) {
    +                   if (path != null) {
    --- End diff --
    
    Would this easier to read with check  {{if(path != null && path.exists())}}


> Temporary channel files are not properly deleted when Flink is terminated
> -------------------------------------------------------------------------
>
>                 Key: FLINK-1483
>                 URL: https://issues.apache.org/jira/browse/FLINK-1483
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 0.8, 0.9
>            Reporter: Till Rohrmann
>            Assignee: Ufuk Celebi
>
> The temporary channel files are not properly deleted if the IOManager does 
> not shut down properly. This can be the case when the TaskManagers are 
> terminated by Flink's shell scripts.
> A solution could be to store all channel files of one TaskManager in a 
> uniquely identifiable directory and to register a shutdown hook which deletes 
> this file upon termination.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to