[ https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326411#comment-14326411 ]
ASF GitHub Bot commented on FLINK-1483: --------------------------------------- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/417#discussion_r24931138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java --- @@ -52,26 +57,88 @@ /** * Constructs a new IOManager. * - * @param paths - * the basic directory paths for files underlying anonymous channels. + * @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** - * Close method, marks the I/O manager as closed. + * Close method, marks the I/O manager as closed + * and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. + * For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null) { --- End diff -- Would this easier to read with check {{if(path != null && path.exists())}} > Temporary channel files are not properly deleted when Flink is terminated > ------------------------------------------------------------------------- > > Key: FLINK-1483 > URL: https://issues.apache.org/jira/browse/FLINK-1483 > Project: Flink > Issue Type: Bug > Components: TaskManager > Affects Versions: 0.8, 0.9 > Reporter: Till Rohrmann > Assignee: Ufuk Celebi > > The temporary channel files are not properly deleted if the IOManager does > not shut down properly. This can be the case when the TaskManagers are > terminated by Flink's shell scripts. > A solution could be to store all channel files of one TaskManager in a > uniquely identifiable directory and to register a shutdown hook which deletes > this file upon termination. -- This message was sent by Atlassian JIRA (v6.3.4#6332)