Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/417#discussion_r24931138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java --- @@ -52,26 +57,88 @@ /** * Constructs a new IOManager. * - * @param paths - * the basic directory paths for files underlying anonymous channels. + * @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** - * Close method, marks the I/O manager as closed. + * Close method, marks the I/O manager as closed + * and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. + * For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null) { --- End diff -- Would this easier to read with check {{if(path != null && path.exists())}}
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---