DRILL-2100: Added deleting temporary spill directories when query is finished.

This closes #454


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/38e1016c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/38e1016c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/38e1016c

Branch: refs/heads/master
Commit: 38e1016c49786acaacb153ee37784b3ce3023eb5
Parents: 1a89a7f
Author: Vitalii Diravka <vitalii.dira...@gmail.com>
Authored: Mon Mar 28 18:05:22 2016 +0000
Committer: Parth Chandra <par...@apache.org>
Committed: Tue May 3 10:50:09 2016 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  | 31 ++++++++++++++++++--
 1 file changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/38e1016c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0ee518e..32df705 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -22,8 +22,10 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
@@ -72,6 +74,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
@@ -116,6 +119,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
   private boolean first = true;
   private int targetRecordCount;
   private final String fileName;
+  private Set<Path> currSpillDirs = Sets.newTreeSet();
   private int firstSpillBatchCount = 0;
   private int peakNumBatches = -1;
 
@@ -158,7 +162,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + 
":copier",
         PriorityQueueCopier.INITIAL_ALLOCATION, 
PriorityQueueCopier.MAX_ALLOCATION);
     FragmentHandle handle = context.getHandle();
-    fileName = 
String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", 
QueryIdHelper.getQueryId(handle.getQueryId()),
+    fileName = String.format("%s_majorfragment%s_minorfragment%s_operator%s", 
QueryIdHelper.getQueryId(handle.getQueryId()),
         handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
popConfig.getOperatorId());
   }
 
@@ -223,7 +227,19 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         if (mSorter != null) {
           mSorter.clear();
         }
-
+        for(Iterator iter = this.currSpillDirs.iterator(); iter.hasNext(); 
iter.remove()) {
+            Path path = (Path)iter.next();
+            try {
+                if (fs != null && path != null && fs.exists(path)) {
+                    if (fs.delete(path, true)) {
+                        fs.cancelDeleteOnExit(path);
+                    }
+                }
+            } catch (IOException e) {
+                // since this is meant to be used in a batches's cleanup, we 
don't propagate the exception
+                logger.warn("Unable to delete spill directory " + path,  e);
+            }
+        }
       }
 
     }
@@ -554,7 +570,16 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     c1.setRecordCount(count);
 
-    String outputFile = Joiner.on("/").join(dirs.next(), fileName, 
spillCount++);
+    String spillDir = dirs.next();
+    Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName));
+    currSpillDirs.add(currSpillPath);
+    String outputFile = Joiner.on("/").join(currSpillPath, spillCount++);
+    try {
+        fs.deleteOnExit(currSpillPath);
+    } catch (IOException e) {
+        // since this is meant to be used in a batches's spilling, we don't 
propagate the exception
+        logger.warn("Unable to mark spill directory " + currSpillPath + " for 
deleting on exit", e);
+    }
     stats.setLongStat(Metric.SPILL_COUNT, spillCount);
     BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext);
     try (AutoCloseable a = AutoCloseables.all(batchGroupList)) {

Reply via email to