DRILL-648: Multiple sort failure in single mode when running TPCH queries

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/01bf8496
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/01bf8496
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/01bf8496

Branch: refs/heads/master
Commit: 01bf8496b217781521c943cc1a9a38ed9f841288
Parents: 5484a55
Author: Steven Phillips <[email protected]>
Authored: Tue May 27 15:26:14 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed May 28 09:18:19 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  |   7 +-
 .../drill/exec/planner/physical/SortPrel.java   |   2 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   3 +
 .../impl/xsort/TestSimpleExternalSort.java      | 101 ++++++++++++-------
 .../xsort/one_key_sort_descending_sv2.json      |  54 ++++++++++
 5 files changed, 126 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 4b6c37d..2289680 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -45,6 +45,7 @@ import 
org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
 import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.util.Utilities;
@@ -222,7 +223,11 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
           w.start();
           sorter.sort(sv2);
 //          logger.debug("Took {} us to sort {} records", 
w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount());
-          batchGroups.add(new BatchGroup(new 
RecordBatchData(incoming).getContainer(), sv2));
+          RecordBatchData rbd = new RecordBatchData(incoming);
+          if (incoming.getSchema().getSelectionVectorMode() == 
SelectionVectorMode.NONE) {
+            rbd.setSv2(sv2);
+          }
+          batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
           batchesSinceLastSpill++;
           if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= 
SPILL_BATCH_GROUP_SIZE) {
             mergeAndSpill();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 464e1bb..3e1bcac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -100,7 +100,7 @@ public class SortPrel extends SortRel implements Prel {
 
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
-    return SelectionVectorMode.DEFAULT; // should support SV2 but there is a 
bug, DRILL-648
+    return SelectionVectorMode.NONE_AND_TWO;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 
b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index e7bc87d..a47796c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -159,6 +159,9 @@ public class BaseTestQuery extends ExecTest{
   protected void testPhysicalFromFile(String file) throws Exception{
     testPhysical(getFile(file));
   }
+  protected List<QueryResultBatch> testPhysicalFromFileWithResults(String 
file) throws Exception {
+    return testRunAndReturn(QueryType.PHYSICAL, getFile(file));
+  }
   protected void testLogicalFromFile(String file) throws Exception{
     testLogical(getFile(file));
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 42fe703..221413e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -20,14 +20,13 @@ package org.apache.drill.exec.physical.impl.xsort;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
+import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.pop.PopUnitTestBase;
-import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
@@ -43,7 +42,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
-public class TestSimpleExternalSort extends PopUnitTestBase {
+public class TestSimpleExternalSort extends BaseTestQuery {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
   DrillConfig c = DrillConfig.create();
 
@@ -51,54 +50,78 @@ public class TestSimpleExternalSort extends PopUnitTestBase 
{
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
 
   @Test
-  public void sortOneKeyDescendingMergeSort() throws Throwable{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+  public void mergeSortWithSv2() throws Exception {
+    List<QueryResultBatch> results = 
testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json");
+    int count = 0;
+    for(QueryResultBatch b : results) {
+      if (b.getHeader().getRowCount() != 0)
+        count += b.getHeader().getRowCount();
+    }
+    assertEquals(500000, count);
 
-    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
-        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
-        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+    long previousBigInt = Long.MAX_VALUE;
 
-      bit1.run();
-      bit2.run();
-      client.connect();
-      List<QueryResultBatch> results = 
client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
-              
Files.toString(FileUtils.getResourceAsFile("/xsort/one_key_sort_descending.json"),
-                      Charsets.UTF_8));
-      int count = 0;
-      for(QueryResultBatch b : results) {
-        if (b.getHeader().getRowCount() != 0)
-          count += b.getHeader().getRowCount();
+    int recordCount = 0;
+    int batchCount = 0;
+
+    for (QueryResultBatch b : results) {
+      if (b.getHeader().getRowCount() == 0) break;
+      batchCount++;
+      RecordBatchLoader loader = new RecordBatchLoader(allocator);
+      loader.load(b.getHeader().getDef(),b.getData());
+      BigIntVector c1 = (BigIntVector) 
loader.getValueAccessorById(BigIntVector.class,
+              loader.getValueVectorId(new SchemaPath("blue", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+
+
+      BigIntVector.Accessor a1 = c1.getAccessor();
+
+      for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+        recordCount++;
+        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt >= a1.get(i));
+        previousBigInt = a1.get(i);
       }
-      assertEquals(1000000, count);
+      loader.clear();
+      b.release();
+    }
 
-      long previousBigInt = Long.MAX_VALUE;
+    System.out.println(String.format("Sorted %,d records in %d batches.", 
recordCount, batchCount));
+  }
 
-      int recordCount = 0;
-      int batchCount = 0;
+  @Test
+  public void sortOneKeyDescendingMergeSort() throws Throwable{
+    List<QueryResultBatch> results = 
testPhysicalFromFileWithResults("xsort/one_key_sort_descending.json");
+    int count = 0;
+    for(QueryResultBatch b : results) {
+      if (b.getHeader().getRowCount() != 0)
+        count += b.getHeader().getRowCount();
+    }
+    assertEquals(1000000, count);
 
-      for (QueryResultBatch b : results) {
-        if (b.getHeader().getRowCount() == 0) break;
-        batchCount++;
-        RecordBatchLoader loader = new 
RecordBatchLoader(bit1.getContext().getAllocator());
-        loader.load(b.getHeader().getDef(),b.getData());
-        BigIntVector c1 = (BigIntVector) 
loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new 
SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+    long previousBigInt = Long.MAX_VALUE;
 
+    int recordCount = 0;
+    int batchCount = 0;
 
-        BigIntVector.Accessor a1 = c1.getAccessor();
-//        IntVector.Accessor a2 = c2.getAccessor();
+    for (QueryResultBatch b : results) {
+      if (b.getHeader().getRowCount() == 0) break;
+      batchCount++;
+      RecordBatchLoader loader = new RecordBatchLoader(allocator);
+      loader.load(b.getHeader().getDef(),b.getData());
+      BigIntVector c1 = (BigIntVector) 
loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new 
SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
 
-        for(int i =0; i < c1.getAccessor().getValueCount(); i++){
-          recordCount++;
-          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt >= a1.get(i));
-          previousBigInt = a1.get(i);
-        }
-        loader.clear();
-        b.release();
-      }
 
-      System.out.println(String.format("Sorted %,d records in %d batches.", 
recordCount, batchCount));
+      BigIntVector.Accessor a1 = c1.getAccessor();
 
+      for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+        recordCount++;
+        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt >= a1.get(i));
+        previousBigInt = a1.get(i);
+      }
+      loader.clear();
+      b.release();
     }
+
+    System.out.println(String.format("Sorted %,d records in %d batches.", 
recordCount, batchCount));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json 
b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json
new file mode 100644
index 0000000..d10aa96
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json
@@ -0,0 +1,54 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org";,
+            entries:[
+                {records: 1000000, types: [
+                  {name: "blue", type: "INT", mode: "REQUIRED"},
+                  {name: "green", type: "INT", mode: "REQUIRED"}
+                ]}
+            ]
+        },
+        {
+            @id: 2,
+            pop: "project",
+            child: 1,
+            exprs: [
+              { ref: "blue", expr: "randomBigInt(100000)" }
+            ]
+        },
+        {
+            @id: 3,
+            pop: "filter",
+            expr: "alternate()",
+            child: 2
+        },
+        {
+            @id:4,
+            child: 3,
+            pop:"external-sort",
+            orderings: [
+              {expr: "blue", order : "DESC"}
+            ]
+        },
+        {
+            @id:5,
+            child: 4,
+            pop:"selection-vector-remover"
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to