DRILL-909: Handle OOM in UnlimitedRawBatchBuffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d9a2f1c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d9a2f1c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d9a2f1c9 Branch: refs/heads/master Commit: d9a2f1c9ac45ff31742282b6319891bec08de745 Parents: 7c1ee01 Author: Steven Phillips <[email protected]> Authored: Wed Jun 11 18:57:08 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 11 21:25:16 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/xsort/ExternalSortBatch.java | 2 +- .../work/batch/UnlimitedRawBatchBuffer.java | 23 ++++++++++++++++++++ .../impl/xsort/TestSimpleExternalSort.java | 1 - 3 files changed, 24 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index d4c0b25..5582742 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -243,7 +243,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // logger.debug("Took {} us to sort {} records", t, count); break; case OUT_OF_MEMORY: - mergeAndSpill(); + if (batchesSinceLastSpill > 2) mergeAndSpill(); batchesSinceLastSpill = 0; break; default: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index a726a82..d14e50c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -34,6 +34,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ private final int softlimit; private final int startlimit; private final AtomicBoolean overlimit = new AtomicBoolean(false); + private final AtomicBoolean outOfMemory = new AtomicBoolean(false); private final ReadController readController; private final boolean multiFragment; @@ -57,6 +58,15 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void enqueue(RawFragmentBatch batch) { + if (batch.getHeader().getIsOutOfMemory()) { + logger.debug("Setting autoread false"); + readController.setAutoRead(false); + if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory()) { + buffer.addFirst(batch); + } + outOfMemory.set(true); + return; + } buffer.add(batch); if(buffer.size() == softlimit){ overlimit.set(true); @@ -85,6 +95,12 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public RawFragmentBatch getNext(){ + if (outOfMemory.get() && buffer.size() < 10) { + logger.debug("Setting autoread true"); + outOfMemory.set(false); + readController.setAutoRead(true); + } + RawFragmentBatch b = null; b = buffer.poll(); @@ -98,6 +114,13 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } } + if (b != null && b.getHeader().getIsOutOfMemory()) { + outOfMemory.set(true); + readController.setAutoRead(false); + return b; + } + + // if we are in the overlimit condition and aren't finished, check if we've passed the start limit. If so, turn off the overlimit condition and set auto read to true (start reading from socket again). if(!finished && overlimit.get()){ if(buffer.size() == startlimit){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index b263d2f..47f0342 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -178,7 +178,6 @@ public class TestSimpleExternalSort extends BaseTestQuery { } @Test - @Ignore public void outOfMemoryExternalSort() throws Throwable{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
