Repository: incubator-drill Updated Branches: refs/heads/master 01bf8496b -> c8a08c3e7
DRILL-847: Handle different schemas for Merging receiver. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/602f8173 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/602f8173 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/602f8173 Branch: refs/heads/master Commit: 602f8173ee9852bb386b5b1751b919396e63387c Parents: 01bf849 Author: Jinfeng Ni <[email protected]> Authored: Tue May 27 11:07:36 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 28 13:15:52 2014 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 29 ++++++++++++++++++++ .../drill/exec/record/RecordBatchLoader.java | 14 ++++++++++ .../drill/exec/record/VectorContainer.java | 26 +++++++++++++++++- 3 files changed, 68 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index e3f466a..9101202 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -61,6 +61,9 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.RelFieldCollation.Direction; +import parquet.Preconditions; + +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.sun.codemodel.JArray; import com.sun.codemodel.JClass; @@ -199,6 +202,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> ++i; } + // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath. + for (RecordBatchLoader loader : batchLoaders) { + loader.canonicalize(); + } + + // Ensure all the incoming batches have the identical schema. + if (!isSameSchemaAmongBatches(batchLoaders)) { + logger.error("Incoming batches for merging receiver have diffferent schemas!"); + context.fail(new SchemaChangeException("Incoming batches for merging receiver have diffferent schemas!")); + return IterOutcome.STOP; + } + // create the outgoing schema and vector container, and allocate the initial batch SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); int vectorCount = 0; @@ -397,6 +412,20 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> return WritableBatch.get(this); } + private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) { + Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!"); + + BatchSchema schema = batchLoaders[0].getSchema(); + + for (int i = 1; i < batchLoaders.length; i++) { + if (!schema.equals(batchLoaders[i].getSchema())) { + logger.error("Schemas are different. Schema 1 : " + schema + ", Schema 2: " + batchLoaders[i].getSchema() ); + return false; + } + } + return true; + } + private void allocateOutgoing() { for (VectorAllocator allocator : allocators) { allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 10d959f..e32fda9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -158,4 +158,18 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp container.clear(); } + public void canonicalize() { + //logger.debug( "RecordBatchLoader : before schema " + schema); + container = VectorContainer.canonicalize(container); + + // rebuild the schema. + SchemaBuilder b = BatchSchema.newBuilder(); + for(VectorWrapper<?> v : container){ + b.addField(v.getField()); + } + b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); + this.schema = b.build(); + + //logger.debug( "RecordBatchLoader : after schema " + schema); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index dd62c67..3c67466 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -85,6 +88,25 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return vc; } + public static VectorContainer canonicalize(VectorContainer original) { + VectorContainer vc = new VectorContainer(); + + List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers); + + // Sort list of VectorWrapper alphabetically based on SchemaPath. + Collections.sort(canonicalWrappers, new Comparator<VectorWrapper<?>>() { + public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) { + return v1.getField().getPath().toExpr().compareTo(v2.getField().getPath().toExpr()); + } + }); + + for (VectorWrapper<?> w : canonicalWrappers) { + vc.add(w.getValueVector()); + } + return vc; + } + + private void cloneAndTransfer(VectorWrapper<?> wrapper) { wrappers.add(wrapper.cloneAndTransfer()); } @@ -160,7 +182,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto clazz.getCanonicalName(), va.getVectorClass().getCanonicalName())); } - return (VectorWrapper<?>) va.getChildWrapper(fieldIds); + return va.getChildWrapper(fieldIds); } @@ -212,4 +234,6 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto public int getNumberOfColumns() { return this.wrappers.size(); } + + }
