Fix for sort ordering: canonicalize incoming batches in top-n
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0b296c82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0b296c82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0b296c82 Branch: refs/heads/master Commit: 0b296c82e5e7b07de1266221f42761a590a476fa Parents: 63d79a1 Author: Steven Phillips <[email protected]> Authored: Mon Jun 23 12:11:19 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 25 09:10:13 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/physical/impl/TopN/TopNBatch.java | 7 +++++-- .../drill/exec/physical/impl/sort/RecordBatchData.java | 6 ++++++ .../exec/physical/impl/sort/SortRecordBatchBuilder.java | 8 +++++++- exec/java-exec/src/main/resources/drill-module.conf | 2 +- 4 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 846d419..77be4ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -170,10 +170,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { case OK: countSincePurge += incoming.getRecordCount(); batchCount++; + RecordBatchData batch = new RecordBatchData(incoming); + batch.canonicalize(); if (priorityQueue == null) { - priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(incoming), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); } - priorityQueue.add(context, new RecordBatchData(incoming)); + priorityQueue.add(context, batch); if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) { purge(); countSincePurge = 0; @@ -242,6 +244,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); VectorContainer newQueue = new VectorContainer(); + builder.canonicalize(); builder.build(context, newQueue); priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent()); builder.getSv4().clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 9cb6f79..02cad5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -60,6 +60,12 @@ public class RecordBatchData { container.setRecordCount(recordCount); container.buildSchema(batch.getSchema().getSelectionVectorMode()); } + + public void canonicalize() { + SelectionVectorMode mode = container.getSchema().getSelectionVectorMode(); + container = VectorContainer.canonicalize(container); + container.buildSchema(mode); + } public int getRecordCount(){ return recordCount; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index ba200f6..9626a97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -112,6 +112,12 @@ public class SortRecordBatchBuilder { return true; } + public void canonicalize() { + for (RecordBatchData batch : batches.values()) { + batch.canonicalize(); + } + } + public boolean isEmpty(){ return batches.isEmpty(); } @@ -165,7 +171,7 @@ public class SortRecordBatchBuilder { } } - for(MaterializedField f : vectors.keySet()){ + for(MaterializedField f : schema){ List<ValueVector> v = vectors.get(f); outputContainer.addHyperList(v, false); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 7f399b2..1519327 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -38,7 +38,7 @@ drill.exec: { count: 7200, delay: 500 }, - threads: 1 + threads: 4 } }, use.ip : false
