DRILL-909: Handle OOM in UnlimitedRawBatchBuffer

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d9a2f1c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d9a2f1c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d9a2f1c9

Branch: refs/heads/master
Commit: d9a2f1c9ac45ff31742282b6319891bec08de745
Parents: 7c1ee01
Author: Steven Phillips <[email protected]>
Authored: Wed Jun 11 18:57:08 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 11 21:25:16 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  |  2 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     | 23 ++++++++++++++++++++
 .../impl/xsort/TestSimpleExternalSort.java      |  1 -
 3 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index d4c0b25..5582742 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -243,7 +243,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
 //          logger.debug("Took {} us to sort {} records", t, count);
           break;
         case OUT_OF_MEMORY:
-          mergeAndSpill();
+          if (batchesSinceLastSpill > 2) mergeAndSpill();
           batchesSinceLastSpill = 0;
           break;
         default:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index a726a82..d14e50c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -34,6 +34,7 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
   private final int softlimit;
   private final int startlimit;
   private final AtomicBoolean overlimit = new AtomicBoolean(false);
+  private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ReadController readController;
   private final boolean multiFragment;
 
@@ -57,6 +58,15 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
 
   @Override
   public void enqueue(RawFragmentBatch batch) {
+    if (batch.getHeader().getIsOutOfMemory()) {
+      logger.debug("Setting autoread false");
+      readController.setAutoRead(false);
+      if (!outOfMemory.get() && 
!buffer.peekFirst().getHeader().getIsOutOfMemory()) {
+        buffer.addFirst(batch);
+      }
+      outOfMemory.set(true);
+      return;
+    }
     buffer.add(batch);
     if(buffer.size() == softlimit){
       overlimit.set(true);
@@ -85,6 +95,12 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
   @Override
   public RawFragmentBatch getNext(){
 
+    if (outOfMemory.get() && buffer.size() < 10) {
+      logger.debug("Setting autoread true");
+      outOfMemory.set(false);
+      readController.setAutoRead(true);
+    }
+
     RawFragmentBatch b = null;
 
     b = buffer.poll();
@@ -98,6 +114,13 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
       }
     }
 
+    if (b != null && b.getHeader().getIsOutOfMemory()) {
+      outOfMemory.set(true);
+      readController.setAutoRead(false);
+      return b;
+    }
+
+
     // if we are in the overlimit condition and aren't finished, check if 
we've passed the start limit.  If so, turn off the overlimit condition and set 
auto read to true (start reading from socket again).
     if(!finished && overlimit.get()){
       if(buffer.size() == startlimit){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d9a2f1c9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index b263d2f..47f0342 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -178,7 +178,6 @@ public class TestSimpleExternalSort extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void outOfMemoryExternalSort() throws Throwable{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
 

Reply via email to