[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...

2015-02-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/417#issuecomment-75019114
  
Good comments. I address them and merge this.

The tests that fail are not related to this change (there is an instability 
in the TaskManager tests currently)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...

2015-02-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...

2015-02-18 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/417#discussion_r24931138
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 ---
@@ -52,26 +57,88 @@
/**
 * Constructs a new IOManager.
 *
-* @param paths
-*the basic directory paths for files underlying anonymous 
channels.
+* @param tempDirs The basic directories for files underlying anonymous 
channels.
 */
-   protected IOManager(String[] paths) {
-   this.paths = paths;
+   protected IOManager(String[] tempDirs) {
+   if (tempDirs == null || tempDirs.length == 0) {
+   throw new IllegalArgumentException(The temporary 
directories must not be null or empty.);
+   }
+
this.random = new Random();
this.nextPath = 0;
+
+   this.paths = new File[tempDirs.length];
+   for (int i = 0; i  tempDirs.length; i++) {
+   File baseDir = new File(tempDirs[i]);
+   String subfolder = String.format(flink-io-%s, 
UUID.randomUUID().toString());
+   File storageDir = new File(baseDir, subfolder);
+
+   if (!storageDir.exists()  !storageDir.mkdirs()) {
+   throw new RuntimeException(
+   Could not create storage 
directory for IOManager:  + storageDir.getAbsolutePath());
+   }
+   paths[i] = storageDir;
+   LOG.info(I/O manager uses directory {} for spill 
files., storageDir.getAbsolutePath());
+   }
+
+   this.shutdownHook = new Thread(I/O manager shutdown hook) {
+   @Override
+   public void run() {
+   shutdown();
+   }
+   };
+   Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
 
/**
-* Close method, marks the I/O manager as closed.
+* Close method, marks the I/O manager as closed
+* and removed all temporary files.
 */
-   public abstract void shutdown();
+   public void shutdown() {
+   // remove all of our temp directories
+   for (File path : paths) {
+   try {
+   if (path != null) {
+   if (path.exists()) {
+   FileUtils.deleteDirectory(path);
+   LOG.info(I/O manager removed 
spill file directory {}, path.getAbsolutePath());
+   }
+   }
+   } catch (Throwable t) {
+   LOG.error(IOManager failed to properly clean 
up temp file directory:  + path, t);
+   }
+   }
+
+   // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the shutdown hook itself
+   if (shutdownHook != Thread.currentThread()) {
+   try {
+   
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+   }
+   catch (IllegalStateException e) {
+   // race, JVM is in shutdown already, we can 
safely ignore this
+   }
+   catch (Throwable t) {
+   LOG.warn(Exception while unregistering 
IOManager's shutdown hook., t);
+   }
+   }
+   }
 
/**
 * Utility method to check whether the IO manager has been properly 
shut down.
+* For this base implementation, this means that all files have been 
removed.
 *
 * @return True, if the IO manager has properly shut down, false 
otherwise.
 */
-   public abstract boolean isProperlyShutDown();
+   public boolean isProperlyShutDown() {
+   for (File path : paths) {
+   if (path != null) {
--- End diff --

Would this easier to read with check  {{if(path != null  path.exists())}}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...

2015-02-18 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/417#issuecomment-74930615
  
Since the IOManager add shutdown hook to clean up the files, should 
IOManagerAsync#isProperlyShutDown need to call super.isProperlyShutDown ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---