Github user hsaputra commented on a diff in the pull request:
https://github.com/apache/flink/pull/417#discussion_r24931138
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
---
@@ -52,26 +57,88 @@
/**
* Constructs a new IOManager.
*
-* @param paths
-*the basic directory paths for files underlying anonymous
channels.
+* @param tempDirs The basic directories for files underlying anonymous
channels.
*/
- protected IOManager(String[] paths) {
- this.paths = paths;
+ protected IOManager(String[] tempDirs) {
+ if (tempDirs == null || tempDirs.length == 0) {
+ throw new IllegalArgumentException(The temporary
directories must not be null or empty.);
+ }
+
this.random = new Random();
this.nextPath = 0;
+
+ this.paths = new File[tempDirs.length];
+ for (int i = 0; i tempDirs.length; i++) {
+ File baseDir = new File(tempDirs[i]);
+ String subfolder = String.format(flink-io-%s,
UUID.randomUUID().toString());
+ File storageDir = new File(baseDir, subfolder);
+
+ if (!storageDir.exists() !storageDir.mkdirs()) {
+ throw new RuntimeException(
+ Could not create storage
directory for IOManager: + storageDir.getAbsolutePath());
+ }
+ paths[i] = storageDir;
+ LOG.info(I/O manager uses directory {} for spill
files., storageDir.getAbsolutePath());
+ }
+
+ this.shutdownHook = new Thread(I/O manager shutdown hook) {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
/**
-* Close method, marks the I/O manager as closed.
+* Close method, marks the I/O manager as closed
+* and removed all temporary files.
*/
- public abstract void shutdown();
+ public void shutdown() {
+ // remove all of our temp directories
+ for (File path : paths) {
+ try {
+ if (path != null) {
+ if (path.exists()) {
+ FileUtils.deleteDirectory(path);
+ LOG.info(I/O manager removed
spill file directory {}, path.getAbsolutePath());
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error(IOManager failed to properly clean
up temp file directory: + path, t);
+ }
+ }
+
+ // Remove shutdown hook to prevent resource leaks, unless this
is invoked by the shutdown hook itself
+ if (shutdownHook != Thread.currentThread()) {
+ try {
+
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can
safely ignore this
+ }
+ catch (Throwable t) {
+ LOG.warn(Exception while unregistering
IOManager's shutdown hook., t);
+ }
+ }
+ }
/**
* Utility method to check whether the IO manager has been properly
shut down.
+* For this base implementation, this means that all files have been
removed.
*
* @return True, if the IO manager has properly shut down, false
otherwise.
*/
- public abstract boolean isProperlyShutDown();
+ public boolean isProperlyShutDown() {
+ for (File path : paths) {
+ if (path != null) {
--- End diff --
Would this easier to read with check {{if(path != null path.exists())}}
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---