DRILL-2100: Added deleting temporary spill directories when query is finished.
This closes #454 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/38e1016c Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/38e1016c Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/38e1016c Branch: refs/heads/master Commit: 38e1016c49786acaacb153ee37784b3ce3023eb5 Parents: 1a89a7f Author: Vitalii Diravka <vitalii.dira...@gmail.com> Authored: Mon Mar 28 18:05:22 2016 +0000 Committer: Parth Chandra <par...@apache.org> Committed: Tue May 3 10:50:09 2016 -0700 ---------------------------------------------------------------------- .../physical/impl/xsort/ExternalSortBatch.java | 31 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/38e1016c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 0ee518e..32df705 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -22,8 +22,10 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; @@ -72,6 +74,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; @@ -116,6 +119,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private boolean first = true; private int targetRecordCount; private final String fileName; + private Set<Path> currSpillDirs = Sets.newTreeSet(); private int firstSpillBatchCount = 0; private int peakNumBatches = -1; @@ -158,7 +162,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier", PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION); FragmentHandle handle = context.getHandle(); - fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()), + fileName = String.format("%s_majorfragment%s_minorfragment%s_operator%s", QueryIdHelper.getQueryId(handle.getQueryId()), handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId()); } @@ -223,7 +227,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (mSorter != null) { mSorter.clear(); } - + for(Iterator iter = this.currSpillDirs.iterator(); iter.hasNext(); iter.remove()) { + Path path = (Path)iter.next(); + try { + if (fs != null && path != null && fs.exists(path)) { + if (fs.delete(path, true)) { + fs.cancelDeleteOnExit(path); + } + } + } catch (IOException e) { + // since this is meant to be used in a batches's cleanup, we don't propagate the exception + logger.warn("Unable to delete spill directory " + path, e); + } + } } } @@ -554,7 +570,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); - String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); + String spillDir = dirs.next(); + Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName)); + currSpillDirs.add(currSpillPath); + String outputFile = Joiner.on("/").join(currSpillPath, spillCount++); + try { + fs.deleteOnExit(currSpillPath); + } catch (IOException e) { + // since this is meant to be used in a batches's spilling, we don't propagate the exception + logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", e); + } stats.setLongStat(Metric.SPILL_COUNT, spillCount); BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext); try (AutoCloseable a = AutoCloseables.all(batchGroupList)) {