DRILL-6230: Extend row set readers to handle hyper vectors closes #1161
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4f2182e4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4f2182e4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4f2182e4 Branch: refs/heads/master Commit: 4f2182e41f4474ca42ae6d572a9c5d5ff274d984 Parents: 127e415 Author: Paul Rogers <prog...@cloudera.com> Authored: Sat Mar 10 23:43:36 2018 -0800 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Fri Apr 6 12:05:14 2018 +0300 ---------------------------------------------------------------------- .../rowSet/model/AbstractReaderBuilder.java | 45 ++ .../exec/physical/rowSet/model/ReaderIndex.java | 28 +- .../physical/rowSet/model/SchemaInference.java | 62 --- .../rowSet/model/hyper/BaseReaderBuilder.java | 142 ++++-- .../model/hyper/HyperSchemaInference.java | 72 +++ .../rowSet/model/single/BaseReaderBuilder.java | 81 ++-- .../rowSet/model/single/DirectRowIndex.java | 40 ++ .../model/single/SingleSchemaInference.java | 65 +++ .../physical/rowSet/impl/RowSetTestUtils.java | 59 +++ .../impl/TestResultSetLoaderMapArray.java | 135 +++--- .../rowSet/impl/TestResultSetLoaderMaps.java | 193 +++++--- .../impl/TestResultSetLoaderOmittedValues.java | 3 +- .../impl/TestResultSetLoaderOverflow.java | 173 ++++--- .../impl/TestResultSetLoaderProjection.java | 139 +----- .../impl/TestResultSetLoaderProtocol.java | 2 +- .../rowSet/impl/TestResultSetLoaderTorture.java | 16 +- .../drill/exec/record/TestVectorContainer.java | 3 +- .../apache/drill/test/rowSet/DirectRowSet.java | 31 +- .../drill/test/rowSet/HyperRowSetImpl.java | 102 +++- .../drill/test/rowSet/IndirectRowSet.java | 14 +- .../org/apache/drill/test/rowSet/RowSet.java | 7 + .../drill/test/rowSet/RowSetComparison.java | 73 +-- .../apache/drill/test/rowSet/RowSetPrinter.java | 18 +- .../apache/drill/test/rowSet/RowSetReader.java | 11 +- .../drill/test/rowSet/RowSetReaderImpl.java | 24 +- .../drill/test/rowSet/schema/UnionBuilder.java | 4 +- .../drill/test/rowSet/test/RowSetTest.java | 268 +++++++++-- .../drill/test/rowSet/test/TestFillEmpties.java | 12 +- .../rowSet/test/TestHyperVectorReaders.java | 365 ++++++++++++++ .../test/rowSet/test/TestScalarAccessors.java | 474 +++++++++++++++---- .../drill/test/rowSet/test/VectorPrinter.java | 72 --- .../src/main/java/io/netty/buffer/DrillBuf.java | 15 + .../main/codegen/templates/ColumnAccessors.java | 251 +++++----- .../drill/exec/vector/accessor/ArrayReader.java | 40 +- .../exec/vector/accessor/ColumnReader.java | 83 ++++ .../exec/vector/accessor/ColumnReaderIndex.java | 102 +++- .../exec/vector/accessor/ObjectReader.java | 26 +- .../vector/accessor/ScalarElementReader.java | 65 --- .../exec/vector/accessor/ScalarReader.java | 13 +- .../drill/exec/vector/accessor/TupleReader.java | 31 +- .../accessor/UnsupportedConversionError.java | 52 ++ .../drill/exec/vector/accessor/ValueType.java | 57 ++- .../vector/accessor/impl/VectorPrinter.java | 72 +++ .../exec/vector/accessor/package-info.java | 184 +++++++ .../accessor/reader/AbstractArrayReader.java | 188 -------- .../accessor/reader/AbstractObjectReader.java | 21 +- .../accessor/reader/AbstractScalarReader.java | 205 ++++++++ .../accessor/reader/AbstractTupleReader.java | 85 ++-- .../vector/accessor/reader/ArrayReaderImpl.java | 357 ++++++++++++++ .../accessor/reader/BaseElementReader.java | 187 -------- .../accessor/reader/BaseScalarReader.java | 198 ++++---- .../accessor/reader/ColumnReaderFactory.java | 52 +- .../reader/FixedWidthElementReaderIndex.java | 38 -- .../exec/vector/accessor/reader/MapReader.java | 49 +- .../vector/accessor/reader/NullStateReader.java | 52 ++ .../accessor/reader/NullStateReaders.java | 193 ++++++++ .../accessor/reader/ObjectArrayReader.java | 159 ------- .../accessor/reader/OffsetVectorReader.java | 70 +++ .../vector/accessor/reader/ReaderEvents.java | 31 ++ .../accessor/reader/ScalarArrayReader.java | 102 ---- .../vector/accessor/reader/VectorAccessor.java | 5 +- .../vector/accessor/reader/VectorAccessors.java | 344 ++++++++++++++ .../vector/accessor/reader/package-info.java | 65 ++- .../accessor/writer/ColumnWriterFactory.java | 4 +- .../drill/exec/vector/complex/ListVector.java | 2 +- 65 files changed, 4122 insertions(+), 2009 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/AbstractReaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/AbstractReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/AbstractReaderBuilder.java new file mode 100644 index 0000000..4bcff72 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/AbstractReaderBuilder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; +import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl; +import org.apache.drill.exec.vector.accessor.reader.BaseScalarReader; +import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessor; + +public abstract class AbstractReaderBuilder { + + protected AbstractObjectReader buildScalarReader(VectorAccessor va, ColumnMetadata schema) { + BaseScalarReader scalarReader = ColumnReaderFactory.buildColumnReader(va); + DataMode mode = va.type().getMode(); + switch (mode) { + case OPTIONAL: + return BaseScalarReader.buildOptional(schema, va, scalarReader); + case REQUIRED: + return BaseScalarReader.buildRequired(schema, va, scalarReader); + case REPEATED: + return ArrayReaderImpl.buildScalar(schema, va, scalarReader); + default: + throw new UnsupportedOperationException(mode.toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java index c4b0415..633ce9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java @@ -28,26 +28,30 @@ import org.apache.drill.exec.vector.accessor.ColumnReaderIndex; public abstract class ReaderIndex implements ColumnReaderIndex { - protected int rowIndex = -1; + protected int position = -1; protected final int rowCount; public ReaderIndex(int rowCount) { this.rowCount = rowCount; } - public int position() { return rowIndex; } - public void set(int index) { rowIndex = index; } + public void set(int index) { + assert position >= -1 && position <= rowCount; + position = index; + } + + @Override + public int logicalIndex() { return position; } + + @Override + public int size() { return rowCount; } + @Override public boolean next() { - if (++rowIndex < rowCount ) { + if (++position < rowCount) { return true; - } else { - rowIndex--; - return false; } + position = rowCount; + return false; } - - public int size() { return rowCount; } - - public boolean valid() { return rowIndex < rowCount; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java deleted file mode 100644 index 9096ec2..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.rowSet.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.record.metadata.MetadataUtils; -import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.exec.record.metadata.TupleSchema; - -/** - * Produce a metadata schema from a vector container. Used when given a - * record batch without metadata. - */ - -public class SchemaInference { - - public TupleMetadata infer(VectorContainer container) { - List<ColumnMetadata> columns = new ArrayList<>(); - for (int i = 0; i < container.getNumberOfColumns(); i++) { - MaterializedField field = container.getValueVector(i).getField(); - columns.add(inferVector(field)); - } - return MetadataUtils.fromColumns(columns); - } - - private ColumnMetadata inferVector(MaterializedField field) { - if (field.getType().getMinorType() == MinorType.MAP) { - return MetadataUtils.newMap(field, inferMapSchema(field)); - } else { - return MetadataUtils.fromField(field); - } - } - - private TupleSchema inferMapSchema(MaterializedField field) { - List<ColumnMetadata> columns = new ArrayList<>(); - for (MaterializedField child : field.getChildren()) { - columns.add(inferVector(child)); - } - return MetadataUtils.fromColumns(columns); - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java index ee856be..c636cb6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java @@ -21,27 +21,54 @@ import java.util.ArrayList; import java.util.List; import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; -import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.rowSet.model.AbstractReaderBuilder; import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; -import org.apache.drill.exec.record.HyperVectorWrapper; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ColumnReaderIndex; import org.apache.drill.exec.vector.accessor.impl.AccessorUtilities; import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; -import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory; +import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl; import org.apache.drill.exec.vector.accessor.reader.MapReader; -import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader; import org.apache.drill.exec.vector.accessor.reader.VectorAccessor; -import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessors; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.BaseHyperVectorAccessor; + +/** + * Base reader builder for a hyper-batch. The semantics of hyper-batches are + * a bit rough. When a single batch, we can walk the vector tree to get the + * information we need. But, hyper vector wrappers don't provide that same + * information, so we can't just walk them. Further, the code that builds + * hyper-batches appears perfectly happy to accept batches with differing + * schemas, something that will cause the readers to blow up because they + * must commit to a particular kind of reader for each vector. + * <p> + * The solution is to build the readers in two passes. The first builds a + * metadata model for each batch and merges those models. (This version + * requires strict identity in schemas; a fancier solution could handle, + * say, the addition of map members in one batch vs. another or the addition + * of union/list members across batches.) + * <p> + * The metadata (by design) has the information we need, so in the second pass + * we walk the metadata hierarchy and build up readers from that, creating + * vector accessors as we go to provide a runtime path from the root vectors + * (selected by the SV4) to the inner vectors (which are not represented as + * hypervectors.) + * <p> + * The hypervector wrapper mechanism provides a crude way to handle inner + * vectors, but it is awkward, and does not lend itself to the kind of caching + * we'd like for performance, so we use our own accessors for inner vectors. + * The outermost hyper vector accessors wrap a hyper vector wrapper. Inner + * accessors directly navigate at the vector level (from a vector provided by + * the outer vector accessor.) + */ -public abstract class BaseReaderBuilder { +public abstract class BaseReaderBuilder extends AbstractReaderBuilder { /** * Read-only row index into the hyper row set with batch and index @@ -58,13 +85,13 @@ public abstract class BaseReaderBuilder { } @Override - public int vectorIndex() { - return AccessorUtilities.sv4Index(sv4.get(rowIndex)); + public int offset() { + return AccessorUtilities.sv4Index(sv4.get(position)); } @Override - public int batchIndex( ) { - return AccessorUtilities.sv4Batch(sv4.get(rowIndex)); + public int hyperVectorIndex( ) { + return AccessorUtilities.sv4Batch(sv4.get(position)); } } @@ -72,14 +99,18 @@ public abstract class BaseReaderBuilder { * Vector accessor used by the column accessors to obtain the vector for * each column value. That is, position 0 might be batch 4, index 3, * while position 1 might be batch 1, index 7, and so on. + * <p> + * Must be here: the reader layer is in the <tt>vector</tt> package + * and does not have visibility to <tt>java-exec</tt> classes. */ - public static class HyperVectorAccessor implements VectorAccessor { + public static class HyperVectorAccessor extends BaseHyperVectorAccessor { private final ValueVector[] vectors; private ColumnReaderIndex rowIndex; public HyperVectorAccessor(VectorWrapper<?> vw) { + super(vw.getField().getType()); vectors = vw.getValueVectors(); } @@ -88,61 +119,70 @@ public abstract class BaseReaderBuilder { rowIndex = index; } + @SuppressWarnings("unchecked") @Override - public ValueVector vector() { - return vectors[rowIndex.batchIndex()]; + public <T extends ValueVector> T vector() { + return (T) vectors[rowIndex.hyperVectorIndex()]; } } + protected List<AbstractObjectReader> buildContainerChildren( + VectorContainer container) throws SchemaChangeException { + TupleMetadata schema = new HyperSchemaInference().infer(container); + return buildContainerChildren(container, schema); + } - protected AbstractObjectReader[] buildContainerChildren( - VectorContainer container, MetadataProvider mdProvider) { + protected List<AbstractObjectReader> buildContainerChildren( + VectorContainer container, TupleMetadata schema) { List<AbstractObjectReader> readers = new ArrayList<>(); for (int i = 0; i < container.getNumberOfColumns(); i++) { VectorWrapper<?> vw = container.getValueVector(i); - VectorDescrip descrip = new VectorDescrip(mdProvider, i, vw.getField()); - readers.add(buildVectorReader(vw, descrip)); + VectorAccessor va = new HyperVectorAccessor(vw); + readers.add(buildVectorReader(va, schema.metadata(i))); } - return readers.toArray(new AbstractObjectReader[readers.size()]); + return readers; } - @SuppressWarnings("unchecked") - private AbstractObjectReader buildVectorReader(VectorWrapper<?> vw, VectorDescrip descrip) { - MajorType type = vw.getField().getType(); - if (type.getMinorType() == MinorType.MAP) { - if (type.getMode() == DataMode.REPEATED) { - return buildMapArrayReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip); - } else { - return buildMapReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip); - } - } else { - return buildPrimitiveReader(vw, descrip); + protected AbstractObjectReader buildVectorReader(VectorAccessor va, ColumnMetadata metadata) { + switch(metadata.type()) { + case MAP: + return buildMap(va, metadata.mode(), metadata); + default: + return buildScalarReader(va, metadata); } } - private AbstractObjectReader buildMapArrayReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { - AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vectors, descrip)); - return ObjectArrayReader.build(new HyperVectorAccessor(vectors), mapReader); - } + private AbstractObjectReader buildMap(VectorAccessor va, DataMode mode, ColumnMetadata metadata) { - private AbstractObjectReader buildMapReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { - return MapReader.build(descrip.metadata, buildMap(vectors, descrip)); - } + boolean isArray = mode == DataMode.REPEATED; + + // Map type - private AbstractObjectReader buildPrimitiveReader(VectorWrapper<?> vw, VectorDescrip descrip) { - return ColumnReaderFactory.buildColumnReader( - vw.getField().getType(), new HyperVectorAccessor(vw)); + AbstractObjectReader mapReader = MapReader.build( + metadata, + isArray ? null : va, + buildMapMembers(va, metadata.mapSchema())); + + // Single map + + if (! isArray) { + return mapReader; + } + + // Repeated map + + return ArrayReaderImpl.buildTuple(metadata, va, mapReader); } - private List<AbstractObjectReader> buildMap(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { + protected List<AbstractObjectReader> buildMapMembers(VectorAccessor va, TupleMetadata mapSchema) { List<AbstractObjectReader> readers = new ArrayList<>(); - MetadataProvider provider = descrip.parent.childProvider(descrip.metadata); - MaterializedField mapField = vectors.getField(); - for (int i = 0; i < mapField.getChildren().size(); i++) { - HyperVectorWrapper<? extends ValueVector> child = (HyperVectorWrapper<? extends ValueVector>) vectors.getChildWrapper(new int[] {i}); - VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField()); - readers.add(buildVectorReader(child, childDescrip)); - i++; + for (int i = 0; i < mapSchema.size(); i++) { + ColumnMetadata member = mapSchema.metadata(i); + // Does not use the hyper-vector mechanism. + + readers.add(buildVectorReader( + new VectorAccessors.MapMemberHyperVectorAccessor(va, i, member.majorType()), + member)); } return readers; } http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/HyperSchemaInference.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/HyperSchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/HyperSchemaInference.java new file mode 100644 index 0000000..3e14866 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/HyperSchemaInference.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.hyper; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.record.metadata.VariantMetadata; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Infer the schema for a hyperbatch. Scans each vector of each batch + * to ensure that the vectors are compatible. (They should be.) + * <p> + * This code should be extended to handle merges. For example, batch + * 1 may have a union with type INT. Batch 2 might have a union with + * VARCHAR. The combined schema should have (INT, VARCHAR). The same + * is true with (non-repeated) lists. There may be other cases. + */ + +public class HyperSchemaInference { + + public TupleMetadata infer(VectorContainer container) throws SchemaChangeException { + TupleSchema schema = new TupleSchema(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + VectorWrapper<?> vw = container.getValueVector(i); + schema.addColumn(buildColumn(vw)); + } + return schema; + } + + private ColumnMetadata buildColumn(VectorWrapper<?> vw) throws SchemaChangeException { + ColumnMetadata commonSchema = null; + for (ValueVector vector : vw.getValueVectors()) { + ColumnMetadata mapSchema = MetadataUtils.fromField(vector.getField()); + if (commonSchema == null) { + commonSchema = mapSchema; + } else if (! commonSchema.isEquivalent(mapSchema)) { + throw new SchemaChangeException("Maps are not consistent"); + } + } + + // Special handling of lists and unions + + if (commonSchema.isVariant()) { + VariantMetadata variantSchema = commonSchema.variantSchema(); + if (variantSchema.size() == 1) { + variantSchema.becomeSimple(); + } + } + return commonSchema; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java index 80ad19f..4539dbb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java @@ -22,68 +22,81 @@ import java.util.List; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.model.AbstractReaderBuilder; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; -import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory; +import org.apache.drill.exec.vector.accessor.reader.AbstractScalarReader; +import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl; import org.apache.drill.exec.vector.accessor.reader.MapReader; -import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessor; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.SingleVectorAccessor; import org.apache.drill.exec.vector.complex.AbstractMapVector; -import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.drill.exec.vector.complex.RepeatedMapVector; -public abstract class BaseReaderBuilder { +public abstract class BaseReaderBuilder extends AbstractReaderBuilder { protected List<AbstractObjectReader> buildContainerChildren( VectorContainer container, MetadataProvider mdProvider) { - List<AbstractObjectReader> writers = new ArrayList<>(); + List<AbstractObjectReader> readers = new ArrayList<>(); for (int i = 0; i < container.getNumberOfColumns(); i++) { - @SuppressWarnings("resource") ValueVector vector = container.getValueVector(i).getValueVector(); VectorDescrip descrip = new VectorDescrip(mdProvider, i, vector.getField()); - writers.add(buildVectorReader(vector, descrip)); + readers.add(buildVectorReader(vector, descrip)); } - return writers; + return readers; } - private AbstractObjectReader buildVectorReader(ValueVector vector, VectorDescrip descrip) { - MajorType type = vector.getField().getType(); - if (type.getMinorType() == MinorType.MAP) { - if (type.getMode() == DataMode.REPEATED) { - return buildMapArrayReader((RepeatedMapVector) vector, descrip); - } else { - return buildMapReader((MapVector) vector, descrip); - } - } else { - return buildPrimitiveReader(vector, descrip); + protected AbstractObjectReader buildVectorReader(ValueVector vector, VectorDescrip descrip) { + VectorAccessor va = new SingleVectorAccessor(vector); + MajorType type = va.type(); + + switch(type.getMinorType()) { + case MAP: + return buildMap((AbstractMapVector) vector, va, type.getMode(), descrip); + case LATE: + + // Occurs for a list with no type: a list of nulls. + + return AbstractScalarReader.nullReader(descrip.metadata); + default: + return buildScalarReader(va, descrip.metadata); } } - private AbstractObjectReader buildMapArrayReader(RepeatedMapVector vector, VectorDescrip descrip) { - AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vector, descrip)); - return ObjectArrayReader.build(vector, mapReader); - } + private AbstractObjectReader buildMap(AbstractMapVector vector, VectorAccessor va, DataMode mode, VectorDescrip descrip) { - private AbstractObjectReader buildMapReader(MapVector vector, VectorDescrip descrip) { - return MapReader.build(descrip.metadata, buildMap(vector, descrip)); - } + boolean isArray = mode == DataMode.REPEATED; - private AbstractObjectReader buildPrimitiveReader(ValueVector vector, VectorDescrip descrip) { - return ColumnReaderFactory.buildColumnReader(vector); + // Map type + + AbstractObjectReader mapReader = MapReader.build( + descrip.metadata, + isArray ? null : va, + buildMapMembers(vector, + descrip.parent.childProvider(descrip.metadata))); + + // Single map + + if (! isArray) { + return mapReader; + } + + // Repeated map + + return ArrayReaderImpl.buildTuple(descrip.metadata, va, mapReader); } - private List<AbstractObjectReader> buildMap(AbstractMapVector vector, VectorDescrip descrip) { + protected List<AbstractObjectReader> buildMapMembers(AbstractMapVector mapVector, MetadataProvider provider) { List<AbstractObjectReader> readers = new ArrayList<>(); - MetadataProvider provider = descrip.parent.childProvider(descrip.metadata); int i = 0; - for (ValueVector child : vector) { - VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField()); - readers.add(buildVectorReader(child, childDescrip)); + for (ValueVector vector : mapVector) { + VectorDescrip descrip = new VectorDescrip(provider, i, vector.getField()); + readers.add(buildVectorReader(vector, descrip)); i++; } return readers; } + } http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/DirectRowIndex.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/DirectRowIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/DirectRowIndex.java new file mode 100644 index 0000000..c50487f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/DirectRowIndex.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; + +/** + * Reader index that points directly to each row in the row set. + * This index starts with pointing to the -1st row, so that the + * reader can require a <tt>next()</tt> for every row, including + * the first. (This is the JDBC <tt>RecordSet</tt> convention.) + */ + +public class DirectRowIndex extends ReaderIndex { + + public DirectRowIndex(int rowCount) { + super(rowCount); + } + + @Override + public int offset() { return position; } + + @Override + public int hyperVectorIndex() { return 0; } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/SingleSchemaInference.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/SingleSchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/SingleSchemaInference.java new file mode 100644 index 0000000..b05cb83 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/SingleSchemaInference.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.AbstractColumnMetadata; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; + +/** + * Produce a metadata schema from a vector container. Used when given a + * record batch without metadata. + */ + +public class SingleSchemaInference { + + public TupleMetadata infer(VectorContainer container) { + List<ColumnMetadata> columns = new ArrayList<>(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + columns.add(inferVector(container.getValueVector(i).getValueVector())); + } + return MetadataUtils.fromColumns(columns); + } + + private AbstractColumnMetadata inferVector(ValueVector vector) { + MaterializedField field = vector.getField(); + switch (field.getType().getMinorType()) { + case MAP: + return MetadataUtils.newMap(field, inferMapSchema((AbstractMapVector) vector)); + default: + return MetadataUtils.fromField(field); + } + } + + private TupleSchema inferMapSchema(AbstractMapVector vector) { + List<ColumnMetadata> columns = new ArrayList<>(); + for (int i = 0; i < vector.getField().getChildren().size(); i++) { + columns.add(inferVector(vector.getChildByOrdinal(i))); + } + return MetadataUtils.fromColumns(columns); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java new file mode 100644 index 0000000..c0e8f72 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; + +import com.google.common.collect.Lists; + +public class RowSetTestUtils { + + private RowSetTestUtils() { } + + public static List<SchemaPath> projectList(String... names) { + List<SchemaPath> selected = new ArrayList<>(); + for (String name: names) { + + // Parse from string does not handle wildcards. + + if (name.equals(SchemaPath.DYNAMIC_STAR)) { + selected.add(SchemaPath.STAR_COLUMN); + } else { + selected.add(SchemaPath.parseFromString(name)); + } + } + return selected; + } + + public static List<SchemaPath> projectCols(SchemaPath... cols) { + List<SchemaPath> selected = new ArrayList<>(); + for (SchemaPath col: cols) { + selected.add(col); + } + return selected; + } + + public static List<SchemaPath> projectAll() { + return Lists.newArrayList( + new SchemaPath[] {SchemaPath.getSimplePath(SchemaPath.DYNAMIC_STAR)}); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java index b2e3668..a3f1754 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java @@ -17,7 +17,8 @@ */ package org.apache.drill.exec.physical.rowSet.impl; -import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,15 +30,16 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ArrayReader; import org.apache.drill.exec.vector.accessor.ArrayWriter; -import org.apache.drill.exec.vector.accessor.ScalarElementReader; import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.TupleReader; import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; @@ -57,6 +59,7 @@ import org.junit.Test; public class TestResultSetLoaderMapArray extends SubOperatorTest { + @SuppressWarnings("resource") @Test public void testBasics() { TupleMetadata schema = new SchemaBuilder() @@ -85,28 +88,32 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(10, objArray( - objArray(110, "d1.1"), - objArray(120, "d2.2"))) - .addRow(20, objArray()) - .addRow(30, objArray( - objArray(310, "d3.1"), - objArray(320, "d3.2"), - objArray(330, "d3.3"))) + .addRow(10, mapArray( + mapValue(110, "d1.1"), + mapValue(120, "d2.2"))) + .addRow(20, mapArray()) + .addRow(30, mapArray( + mapValue(310, "d3.1"), + mapValue(320, "d3.2"), + mapValue(330, "d3.3"))) ; // Verify the first batch RowSet actual = fixture.wrap(rsLoader.harvest()); + RepeatedMapVector mapVector = (RepeatedMapVector) actual.container().getValueVector(1).getValueVector(); + MaterializedField mapField = mapVector.getField(); + assertEquals(2, mapField.getChildren().size()); + SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray( - objArray(110, "d1.1"), - objArray(120, "d2.2"))) - .addRow(20, objArray()) - .addRow(30, objArray( - objArray(310, "d3.1"), - objArray(320, "d3.2"), - objArray(330, "d3.3"))) + .addRow(10, mapArray( + mapValue(110, "d1.1"), + mapValue(120, "d2.2"))) + .addRow(20, mapArray()) + .addRow(30, mapArray( + mapValue(310, "d3.1"), + mapValue(320, "d3.2"), + mapValue(330, "d3.3"))) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -115,26 +122,30 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(40, objArray( - objArray(410, "d4.1"), - objArray(420, "d4.2"))); + .addRow(40, mapArray( + mapValue(410, "d4.1"), + mapValue(420, "d4.2"))); TupleWriter mapWriter = rootWriter.array("m").tuple(); mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL)); rootWriter - .addRow(50, objArray( - objArray(510, "d5.1", "e5.1"), - objArray(520, "d5.2", null))) - .addRow(60, objArray( - objArray(610, "d6.1", "e6.1"), - objArray(620, "d6.2", null), - objArray(630, "d6.3", "e6.3"))) + .addRow(50, mapArray( + mapValue(510, "d5.1", "e5.1"), + mapValue(520, "d5.2", null))) + .addRow(60, mapArray( + mapValue(610, "d6.1", "e6.1"), + mapValue(620, "d6.2", null), + mapValue(630, "d6.3", "e6.3"))) ; // Verify the second batch actual = fixture.wrap(rsLoader.harvest()); + mapVector = (RepeatedMapVector) actual.container().getValueVector(1).getValueVector(); + mapField = mapVector.getField(); + assertEquals(3, mapField.getChildren().size()); + TupleMetadata expectedSchema = new SchemaBuilder() .add("a", MinorType.INT) .addMapArray("m") @@ -144,16 +155,16 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { .resumeSchema() .buildSchema(); expected = fixture.rowSetBuilder(expectedSchema) - .addRow(40, objArray( - objArray(410, "d4.1", null), - objArray(420, "d4.2", null))) - .addRow(50, objArray( - objArray(510, "d5.1", "e5.1"), - objArray(520, "d5.2", null))) - .addRow(60, objArray( - objArray(610, "d6.1", "e6.1"), - objArray(620, "d6.2", null), - objArray(630, "d6.3", "e6.3"))) + .addRow(40, mapArray( + mapValue(410, "d4.1", null), + mapValue(420, "d4.2", null))) + .addRow(50, mapArray( + mapValue(510, "d5.1", "e5.1"), + mapValue(520, "d5.2", null))) + .addRow(60, mapArray( + mapValue(610, "d6.1", "e6.1"), + mapValue(620, "d6.2", null), + mapValue(630, "d6.3", "e6.3"))) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -181,28 +192,28 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(10, objArray( - objArray(110, strArray("d1.1.1", "d1.1.2")), - objArray(120, strArray("d1.2.1", "d1.2.2")))) - .addRow(20, objArray()) - .addRow(30, objArray( - objArray(310, strArray("d3.1.1", "d3.2.2")), - objArray(320, strArray()), - objArray(330, strArray("d3.3.1", "d1.2.2")))) + .addRow(10, mapArray( + mapValue(110, strArray("d1.1.1", "d1.1.2")), + mapValue(120, strArray("d1.2.1", "d1.2.2")))) + .addRow(20, mapArray()) + .addRow(30, mapArray( + mapValue(310, strArray("d3.1.1", "d3.2.2")), + mapValue(320, strArray()), + mapValue(330, strArray("d3.3.1", "d1.2.2")))) ; // Verify the batch RowSet actual = fixture.wrap(rsLoader.harvest()); SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray( - objArray(110, strArray("d1.1.1", "d1.1.2")), - objArray(120, strArray("d1.2.1", "d1.2.2")))) - .addRow(20, objArray()) - .addRow(30, objArray( - objArray(310, strArray("d3.1.1", "d3.2.2")), - objArray(320, strArray()), - objArray(330, strArray("d3.3.1", "d1.2.2")))) + .addRow(10, mapArray( + mapValue(110, strArray("d1.1.1", "d1.1.2")), + mapValue(120, strArray("d1.2.1", "d1.2.2")))) + .addRow(20, mapArray()) + .addRow(30, mapArray( + mapValue(310, strArray("d3.1.1", "d3.2.2")), + mapValue(320, strArray()), + mapValue(330, strArray("d3.3.1", "d1.2.2")))) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -270,21 +281,23 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { ArrayReader a2Reader = m1Reader.array("m2"); TupleReader m2Reader = a2Reader.tuple(); ScalarReader cReader = m2Reader.scalar("c"); - ScalarElementReader dReader = m2Reader.elements("d"); + ArrayReader dArray = m2Reader.array("d"); + ScalarReader dReader = dArray.scalar(); for (int i = 0; i < 5; i++) { - reader.next(); + assertTrue(reader.next()); assertEquals(i, aReader.getInt()); for (int j = 0; j < 4; j++) { - a1Reader.setPosn(j); + assertTrue(a1Reader.next()); int a1Key = i + 10 + j; assertEquals(a1Key, bReader.getInt()); for (int k = 0; k < 3; k++) { - a2Reader.setPosn(k); + assertTrue(a2Reader.next()); int a2Key = a1Key * 10 + k; assertEquals(a2Key, cReader.getInt()); for (int l = 0; l < 2; l++) { - assertEquals("d-" + (a2Key * 10 + l), dReader.getString(l)); + assertTrue(dArray.next()); + assertEquals("d-" + (a2Key * 10 + l), dReader.getString()); } } } @@ -357,7 +370,7 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { assertEquals(rowId * 100, reader.scalar("a").getInt()); assertEquals(10, maReader.size()); for (int i = 0; i < 10; i++) { - maReader.setPosn(i); + assert(maReader.next()); assertEquals(rowId * 1000 + i, mReader.scalar("b").getInt()); assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes())); } @@ -426,7 +439,7 @@ public class TestResultSetLoaderMapArray extends SubOperatorTest { } assertEquals(entryCount, maReader.size()); for (int j = 0; j < entryCount; j++) { - maReader.setPosn(j); + assertTrue(maReader.next()); if (j % entrySkip == 0) { assertTrue(mReader.scalar(0).isNull()); assertTrue(mReader.scalar(1).isNull()); http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java index 11f449b..10dbbe1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.rowSet.impl; import static org.apache.drill.test.rowSet.RowSetUtilities.intArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; import static org.junit.Assert.assertEquals; @@ -32,11 +33,13 @@ import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.RowSetLoader; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.TupleReader; import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; @@ -45,6 +48,7 @@ import org.apache.drill.test.rowSet.RowSetReader; import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.Test; + /** * Test (non-array) map support in the result set loader and related classes. */ @@ -104,7 +108,7 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Write another using the test-time conveniences - rootWriter.addRow(20, objArray(210, "barney"), "bam-bam"); + rootWriter.addRow(20, mapValue(210, "barney"), "bam-bam"); // Harvest the batch @@ -115,8 +119,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Validate data SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray(110, "fred"), "pebbles") - .addRow(20, objArray(210, "barney"), "bam-bam") + .addRow(10, mapValue(110, "fred"), "pebbles") + .addRow(20, mapValue(210, "barney"), "bam-bam") .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -146,8 +150,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(10, objArray("fred")) - .addRow(20, objArray("barney")); + .addRow(10, mapValue("fred")) + .addRow(20, mapValue("barney")); RowSet actual = fixture.wrap(rsLoader.harvest()); assertEquals(3, rsLoader.schemaVersion()); @@ -156,8 +160,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Validate first batch SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray("fred")) - .addRow(20, objArray("barney")) + .addRow(10, mapValue("fred")) + .addRow(20, mapValue("barney")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -172,10 +176,10 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { rsLoader.startBatch(); mapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.BIGINT, DataMode.REQUIRED)); - rootWriter.addRow(30, objArray("wilma", 130, 130_000L)); + rootWriter.addRow(30, mapValue("wilma", 130, 130_000L)); mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED)); - rootWriter.addRow(40, objArray("betty", 140, 140_000L, "bam-bam")); + rootWriter.addRow(40, mapValue("betty", 140, 140_000L, "bam-bam")); actual = fixture.wrap(rsLoader.harvest()); assertEquals(6, rsLoader.schemaVersion()); @@ -193,8 +197,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); expected = fixture.rowSetBuilder(expectedSchema) - .addRow(30, objArray("wilma", 130, 130_000L, "")) - .addRow(40, objArray("betty", 140, 140_000L, "bam-bam")) + .addRow(30, mapValue("wilma", 130, 130_000L, "")) + .addRow(40, mapValue("betty", 140, 140_000L, "bam-bam")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -229,16 +233,25 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Add a column to the map with the same name as the top-level column. // Verifies that the name spaces are independent. - mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED)); + int colIndex = mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED)); + assertEquals(0, colIndex); + + // Ensure metadata was added + + assertTrue(mapWriter.schema().size() == 1); rootWriter - .addRow(20, objArray("fred")) - .addRow(30, objArray("barney")); + .addRow(20, mapValue("fred")) + .addRow(30, mapValue("barney")); RowSet actual = fixture.wrap(rsLoader.harvest()); assertEquals(3, rsLoader.schemaVersion()); assertEquals(3, actual.rowCount()); + MapVector mapVector = (MapVector) actual.container().getValueVector(1).getValueVector(); + MaterializedField mapField = mapVector.getField(); + assertEquals(1, mapField.getChildren().size()); + // Validate first batch TupleMetadata expectedSchema = new SchemaBuilder() @@ -248,9 +261,9 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, objArray("")) - .addRow(20, objArray("fred")) - .addRow(30, objArray("barney")) + .addRow(10, mapValue("")) + .addRow(20, mapValue("fred")) + .addRow(30, mapValue("barney")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -285,8 +298,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { TupleWriter mapWriter = rootWriter.tuple(mapIndex); rootWriter - .addRow(20, objArray()) - .addRow(30, objArray()); + .addRow(20, mapValue()) + .addRow(30, mapValue()); RowSet actual = fixture.wrap(rsLoader.harvest()); assertEquals(2, rsLoader.schemaVersion()); @@ -300,9 +313,9 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, objArray()) - .addRow(20, objArray()) - .addRow(30, objArray()) + .addRow(10, mapValue()) + .addRow(20, mapValue()) + .addRow(30, mapValue()) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -313,8 +326,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED)); rootWriter - .addRow(40, objArray("fred")) - .addRow(50, objArray("barney")); + .addRow(40, mapValue("fred")) + .addRow(50, mapValue("barney")); actual = fixture.wrap(rsLoader.harvest()); assertEquals(3, rsLoader.schemaVersion()); @@ -329,8 +342,8 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); expected = fixture.rowSetBuilder(expectedSchema) - .addRow(40, objArray("fred")) - .addRow(50, objArray("barney")) + .addRow(40, mapValue("fred")) + .addRow(50, mapValue("barney")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -371,7 +384,7 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { RowSet actual = fixture.wrap(rsLoader.harvest()); assertEquals(5, rsLoader.schemaVersion()); SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray("b1", objArray("c1"))) + .addRow(10, mapValue("b1", mapValue("c1"))) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -379,21 +392,21 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Now add columns in the second batch. rsLoader.startBatch(); - rootWriter.addRow(20, objArray("b2", objArray("c2"))); + rootWriter.addRow(20, mapValue("b2", mapValue("c2"))); TupleWriter m1Writer = rootWriter.tuple("m1"); m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED)); TupleWriter m2Writer = m1Writer.tuple("m2"); m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED)); - rootWriter.addRow(30, objArray("b3", objArray("c3", "e3"), "d3")); + rootWriter.addRow(30, mapValue("b3", mapValue("c3", "e3"), "d3")); // And another set while the write proceeds. m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REQUIRED)); m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.REQUIRED)); - rootWriter.addRow(40, objArray("b4", objArray("c4", "e4", "g4"), "d4", "e4")); + rootWriter.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4")); // Validate second batch @@ -414,9 +427,9 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); expected = fixture.rowSetBuilder(expectedSchema) - .addRow(20, objArray("b2", objArray("c2", "", "" ), "", "" )) - .addRow(30, objArray("b3", objArray("c3", "e3", "" ), "d3", "" )) - .addRow(40, objArray("b4", objArray("c4", "e4", "g4"), "d4", "e4")) + .addRow(20, mapValue("b2", mapValue("c2", "", "" ), "", "" )) + .addRow(30, mapValue("b3", mapValue("c3", "e3", "" ), "d3", "" )) + .addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -447,13 +460,13 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { RowSetLoader rootWriter = rsLoader.writer(); rsLoader.startBatch(); - rootWriter.addRow(10, objArray("b1", objArray("c1"))); + rootWriter.addRow(10, mapValue("b1", mapValue("c1"))); // Validate first batch RowSet actual = fixture.wrap(rsLoader.harvest()); SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray("b1", objArray("c1"))) + .addRow(10, mapValue("b1", mapValue("c1"))) .build(); // actual.print(); // expected.print(); @@ -463,21 +476,21 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { // Now add columns in the second batch. rsLoader.startBatch(); - rootWriter.addRow(20, objArray("b2", objArray("c2"))); + rootWriter.addRow(20, mapValue("b2", mapValue("c2"))); TupleWriter m1Writer = rootWriter.tuple("m1"); m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL)); TupleWriter m2Writer = m1Writer.tuple("m2"); m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL)); - rootWriter.addRow(30, objArray("b3", objArray("c3", "e3"), "d3")); + rootWriter.addRow(30, mapValue("b3", mapValue("c3", "e3"), "d3")); // And another set while the write proceeds. m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.OPTIONAL)); m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.OPTIONAL)); - rootWriter.addRow(40, objArray("b4", objArray("c4", "e4", "g4"), "d4", "e4")); + rootWriter.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4")); // Validate second batch @@ -496,9 +509,9 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { .resumeSchema() .buildSchema(); expected = fixture.rowSetBuilder(expectedSchema) - .addRow(20, objArray("b2", objArray("c2", null, null), null, null)) - .addRow(30, objArray("b3", objArray("c3", "e3", null), "d3", null)) - .addRow(40, objArray("b4", objArray("c4", "e4", "g4"), "d4", "e4")) + .addRow(20, mapValue("b2", mapValue("c2", null, null), null, null)) + .addRow(30, mapValue("b3", mapValue("c3", "e3", null), "d3", null)) + .addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4")) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -532,20 +545,20 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(10, objArray(intArray(110, 120, 130), - strArray("d1.1", "d1.2", "d1.3", "d1.4"))) - .addRow(20, objArray(intArray(210), strArray())) - .addRow(30, objArray(intArray(), strArray("d3.1"))) + .addRow(10, mapValue(intArray(110, 120, 130), + strArray("d1.1", "d1.2", "d1.3", "d1.4"))) + .addRow(20, mapValue(intArray(210), strArray())) + .addRow(30, mapValue(intArray(), strArray("d3.1"))) ; // Validate first batch RowSet actual = fixture.wrap(rsLoader.harvest()); SingleRowSet expected = fixture.rowSetBuilder(schema) - .addRow(10, objArray(intArray(110, 120, 130), - strArray("d1.1", "d1.2", "d1.3", "d1.4"))) - .addRow(20, objArray(intArray(210), strArray())) - .addRow(30, objArray(intArray(), strArray("d3.1"))) + .addRow(10, mapValue(intArray(110, 120, 130), + strArray("d1.1", "d1.2", "d1.3", "d1.4"))) + .addRow(20, mapValue(intArray(210), strArray())) + .addRow(30, mapValue(intArray(), strArray("d3.1"))) .build(); new RowSetComparison(expected).verifyAndClearAll(actual); @@ -554,15 +567,15 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { rsLoader.startBatch(); rootWriter - .addRow(40, objArray(intArray(410, 420), strArray("d4.1", "d4.2"))) - .addRow(50, objArray(intArray(510), strArray("d5.1"))) + .addRow(40, mapValue(intArray(410, 420), strArray("d4.1", "d4.2"))) + .addRow(50, mapValue(intArray(510), strArray("d5.1"))) ; TupleWriter mapWriter = rootWriter.tuple("m"); mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED)); rootWriter - .addRow(60, objArray(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2"))) - .addRow(70, objArray(intArray(710), strArray(), strArray("e7.1", "e7.2"))) + .addRow(60, mapValue(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2"))) + .addRow(70, mapValue(intArray(710), strArray(), strArray("e7.1", "e7.2"))) ; // Validate first batch. The new array should have been back-filled with @@ -571,10 +584,10 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { actual = fixture.wrap(rsLoader.harvest()); // System.out.println(actual.schema().toString()); expected = fixture.rowSetBuilder(actual.schema()) - .addRow(40, objArray(intArray(410, 420), strArray("d4.1", "d4.2"), strArray())) - .addRow(50, objArray(intArray(510), strArray("d5.1"), strArray())) - .addRow(60, objArray(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2"))) - .addRow(70, objArray(intArray(710), strArray(), strArray("e7.1", "e7.2"))) + .addRow(40, mapValue(intArray(410, 420), strArray("d4.1", "d4.2"), strArray())) + .addRow(50, mapValue(intArray(510), strArray("d5.1"), strArray())) + .addRow(60, mapValue(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2"))) + .addRow(70, mapValue(intArray(710), strArray(), strArray("e7.1", "e7.2"))) .build(); // expected.print(); @@ -614,7 +627,7 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { int count = 0; rsLoader.startBatch(); while (! rootWriter.isFull()) { - rootWriter.addRow(count, objArray(count * 10, objArray(count * 100, value, count * 1000))); + rootWriter.addRow(count, mapValue(count * 10, mapValue(count * 100, value, count * 1000))); count++; } @@ -810,4 +823,68 @@ public class TestResultSetLoaderMaps extends SubOperatorTest { result.clear(); rsLoader.close(); } + + /** + * Verify that map name spaces (and implementations) are + * independent. + */ + + @Test + public void testNameSpace() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("a", MinorType.INT) + .addMap("m") + .add("a", MinorType.INT) + .resumeMap() + .resumeSchema() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + + // Write a row the way that clients will do. + + ScalarWriter a1Writer = rootWriter.scalar("a"); + TupleWriter m1Writer = rootWriter.tuple("m"); + ScalarWriter a2Writer = m1Writer.scalar("a"); + TupleWriter m2Writer = m1Writer.tuple("m"); + ScalarWriter a3Writer = m2Writer.scalar("a"); + + rootWriter.start(); + a1Writer.setInt(11); + a2Writer.setInt(12); + a3Writer.setInt(13); + rootWriter.save(); + + rootWriter.start(); + a1Writer.setInt(21); + a2Writer.setInt(22); + a3Writer.setInt(23); + rootWriter.save(); + + // Try simplified test format + + rootWriter.addRow(31, + mapValue(32, + mapValue(33))); + + // Verify + + RowSet actual = fixture.wrap(rsLoader.harvest()); + + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(11, mapValue(12, mapValue(13))) + .addRow(21, mapValue(22, mapValue(23))) + .addRow(31, mapValue(32, mapValue(33))) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + rsLoader.close(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java index 5fd65a9..a2015fe 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java @@ -38,6 +38,7 @@ import org.apache.drill.test.rowSet.RowSetReader; import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.Test; + public class TestResultSetLoaderOmittedValues extends SubOperatorTest { /** @@ -140,7 +141,7 @@ public class TestResultSetLoaderOmittedValues extends SubOperatorTest { .add("a", MinorType.INT) .add("b", MinorType.VARCHAR) .addNullable("c", MinorType.VARCHAR) - .add("3", MinorType.INT) + .add("d", MinorType.INT) .addNullable("e", MinorType.INT) .addArray("f", MinorType.VARCHAR) .build(); http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java index bb82131..2a33f04 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java @@ -32,7 +32,8 @@ import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetO import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.ScalarElementReader; +import org.apache.drill.exec.vector.accessor.ArrayReader; +import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet; @@ -308,12 +309,14 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { RowSet result = fixture.wrap(rsLoader.harvest()); assertEquals(expectedCount, result.rowCount()); RowSetReader reader = result.reader(); - reader.set(expectedCount - 1); - ScalarElementReader arrayReader = reader.column(0).elements(); + reader.setPosn(expectedCount - 1); + ArrayReader arrayReader = reader.array(0); + ScalarReader strReader = arrayReader.scalar(); assertEquals(valuesPerArray, arrayReader.size()); for (int i = 0; i < valuesPerArray; i++) { + assertTrue(arrayReader.next()); String cellValue = strValue + (count - 1) + "." + i; - assertEquals(cellValue, arrayReader.getString(i)); + assertEquals(cellValue, strReader.getString()); } result.clear(); @@ -331,11 +334,13 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { assertEquals(1, result.rowCount()); reader = result.reader(); reader.next(); - arrayReader = reader.column(0).elements(); + arrayReader = reader.array(0); + strReader = arrayReader.scalar(); assertEquals(valuesPerArray, arrayReader.size()); for (int i = 0; i < valuesPerArray; i++) { + assertTrue(arrayReader.next()); String cellValue = strValue + (count) + "." + i; - assertEquals(cellValue, arrayReader.getString(i)); + assertEquals(cellValue, strReader.getString()); } result.clear(); @@ -427,38 +432,48 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { RowSet result = fixture.wrap(rsLoader.harvest()); assertEquals(count - 1, result.rowCount()); - RowSetReader reader = result.reader(); - ScalarElementReader aReader = reader.array("a").elements(); - ScalarElementReader bReader = reader.array("b").elements(); - ScalarElementReader cReader = reader.array("c").elements(); - ScalarElementReader dReader = reader.array("d").elements(); - - while (reader.next()) { - int rowId = reader.rowIndex(); - assertEquals(aCount, aReader.size()); - for (int i = 0; i < aCount; i++) { - assertEquals(rowId * aCount + i, aReader.getInt(i)); - } - assertEquals(bCount, bReader.size()); - for (int i = 0; i < bCount; i++) { - String cellValue = strValue + (rowId * bCount + i); - assertEquals(cellValue, bReader.getString(i)); - } - if (rowId < cCutoff) { - assertEquals(cCount, cReader.size()); - for (int i = 0; i < cCount; i++) { - assertEquals(rowId * cCount + i, cReader.getInt(i)); + { + RowSetReader reader = result.reader(); + ArrayReader aArray = reader.array("a"); + ScalarReader aReader = aArray.scalar(); + ArrayReader bArray = reader.array("b"); + ScalarReader bReader = bArray.scalar(); + ArrayReader cArray = reader.array("c"); + ScalarReader cReader = cArray.scalar(); + ArrayReader dArray = reader.array("d"); + ScalarReader dReader = dArray.scalar(); + + while (reader.next()) { + int rowId = reader.offset(); + assertEquals(aCount, aArray.size()); + for (int i = 0; i < aCount; i++) { + assertTrue(aArray.next()); + assertEquals(rowId * aCount + i, aReader.getInt()); } - assertEquals(dCount, dReader.size()); - for (int i = 0; i < dCount; i++) { - assertEquals(rowId * dCount + i, dReader.getInt(i)); + assertEquals(bCount, bArray.size()); + for (int i = 0; i < bCount; i++) { + assertTrue(bArray.next()); + String cellValue = strValue + (rowId * bCount + i); + assertEquals(cellValue, bReader.getString()); + } + if (rowId < cCutoff) { + assertEquals(cCount, cArray.size()); + for (int i = 0; i < cCount; i++) { + assertTrue(cArray.next()); + assertEquals(rowId * cCount + i, cReader.getInt()); + } + assertEquals(dCount, dArray.size()); + for (int i = 0; i < dCount; i++) { + assertTrue(dArray.next()); + assertEquals(rowId * dCount + i, dReader.getInt()); + } + } else { + assertEquals(0, cArray.size()); + assertEquals(0, dArray.size()); } - } else { - assertEquals(0, cReader.size()); - assertEquals(0, dReader.size()); } + result.clear(); } - result.clear(); int firstCount = count - 1; // One row is in the batch. Write more, skipping over the @@ -490,41 +505,51 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { result = fixture.wrap(rsLoader.harvest()); assertEquals(6, result.rowCount()); - reader = result.reader(); - aReader = reader.array("a").elements(); - bReader = reader.array("b").elements(); - cReader = reader.array("c").elements(); - dReader = reader.array("d").elements(); - - int j = 0; - while (reader.next()) { - int rowId = firstCount + reader.rowIndex(); - assertEquals(aCount, aReader.size()); - for (int i = 0; i < aCount; i++) { - assertEquals("Index " + i, rowId * aCount + i, aReader.getInt(i)); - } - assertEquals(bCount, bReader.size()); - for (int i = 0; i < bCount; i++) { - String cellValue = strValue + (rowId * bCount + i); - assertEquals(cellValue, bReader.getString(i)); - } - if (j > 4) { - assertEquals(cCount, cReader.size()); - for (int i = 0; i < cCount; i++) { - assertEquals(rowId * cCount + i, cReader.getInt(i)); + { + RowSetReader reader = result.reader(); + ArrayReader aArray = reader.array("a"); + ScalarReader aReader = aArray.scalar(); + ArrayReader bArray = reader.array("b"); + ScalarReader bReader = bArray.scalar(); + ArrayReader cArray = reader.array("c"); + ScalarReader cReader = cArray.scalar(); + ArrayReader dArray = reader.array("d"); + ScalarReader dReader = dArray.scalar(); + + int j = 0; + while (reader.next()) { + int rowId = firstCount + reader.offset(); + assertEquals(aCount, aArray.size()); + for (int i = 0; i < aCount; i++) { + assertTrue(aArray.next()); + assertEquals("Index " + i, rowId * aCount + i, aReader.getInt()); } - } else { - assertEquals(0, cReader.size()); - } - if (j == 0 || j > 4) { - assertEquals(dCount, dReader.size()); - for (int i = 0; i < dCount; i++) { - assertEquals(rowId * dCount + i, dReader.getInt(i)); + assertEquals(bCount, bArray.size()); + for (int i = 0; i < bCount; i++) { + assertTrue(bArray.next()); + String cellValue = strValue + (rowId * bCount + i); + assertEquals(cellValue, bReader.getString()); } - } else { - assertEquals(0, dReader.size()); + if (j > 4) { + assertEquals(cCount, cArray.size()); + for (int i = 0; i < cCount; i++) { + assertTrue(cArray.next()); + assertEquals(rowId * cCount + i, cReader.getInt()); + } + } else { + assertEquals(0, cArray.size()); + } + if (j == 0 || j > 4) { + assertEquals(dCount, dArray.size()); + for (int i = 0; i < dCount; i++) { + assertTrue(dArray.next()); + assertEquals(rowId * dCount + i, dReader.getInt()); + } + } else { + assertEquals(0, dArray.size()); + } + j++; } - j++; } result.clear(); @@ -602,17 +627,19 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { RowSet result = fixture.wrap(rsLoader.harvest()); assertEquals(rowId - 1, result.rowCount()); RowSetReader reader = result.reader(); - ScalarElementReader cReader = reader.array("c").elements(); + ArrayReader cArray = reader.array("c"); + ScalarReader cReader = cArray.scalar(); while (reader.next()) { - assertEquals(reader.rowIndex(), reader.scalar("a").getInt()); + assertEquals(reader.offset(), reader.scalar("a").getInt()); assertTrue(Arrays.equals(value, reader.scalar("b").getBytes())); - if (reader.rowIndex() < blankAfter) { - assertEquals(3, cReader.size()); + if (reader.offset() < blankAfter) { + assertEquals(3, cArray.size()); for (int i = 0; i < 3; i++) { - assertEquals(reader.rowIndex() * 3 + i, cReader.getInt(i)); + assertTrue(cArray.next()); + assertEquals(reader.offset() * 3 + i, cReader.getInt()); } } else { - assertEquals(0, cReader.size()); + assertEquals(0, cArray.size()); } } result.clear(); @@ -655,7 +682,7 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { RowSetReader reader = result.reader(); while (reader.next()) { - assertEquals(reader.rowIndex(), reader.scalar(0).getInt()); + assertEquals(reader.offset(), reader.scalar(0).getInt()); assertTrue(reader.scalar(1).isNull()); assertTrue(Arrays.equals(value, reader.scalar(2).getBytes())); assertTrue(reader.scalar(3).isNull()); http://git-wip-us.apache.org/repos/asf/drill/blob/4f2182e4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java index b69e797..95aa2b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.physical.rowSet.impl; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,103 +43,12 @@ import org.apache.drill.test.rowSet.RowSetComparison; import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.Test; -import com.google.common.collect.Lists; - /** * Test of the basics of the projection mechanism. */ public class TestResultSetLoaderProjection extends SubOperatorTest { - @Test - public void testProjectionMap() { - - // Null map means everything is projected - - { - ProjectionSet projSet = ProjectionSetImpl.parse(null); - assertTrue(projSet instanceof NullProjectionSet); - assertTrue(projSet.isProjected("foo")); - } - - // Empty list means everything is projected - - { - ProjectionSet projSet = ProjectionSetImpl.parse(new ArrayList<SchemaPath>()); - assertTrue(projSet instanceof NullProjectionSet); - assertTrue(projSet.isProjected("foo")); - } - - // Simple non-map columns - - { - List<SchemaPath> projCols = new ArrayList<>(); - projCols.add(SchemaPath.getSimplePath("foo")); - projCols.add(SchemaPath.getSimplePath("bar")); - ProjectionSet projSet = ProjectionSetImpl.parse(projCols); - assertTrue(projSet instanceof ProjectionSetImpl); - assertTrue(projSet.isProjected("foo")); - assertTrue(projSet.isProjected("bar")); - assertFalse(projSet.isProjected("mumble")); - } - - // Whole-map projection (note, fully projected maps are - // identical to projected simple columns at this level of - // abstraction.) - - { - List<SchemaPath> projCols = new ArrayList<>(); - projCols.add(SchemaPath.getSimplePath("map")); - ProjectionSet projSet = ProjectionSetImpl.parse(projCols); - assertTrue(projSet instanceof ProjectionSetImpl); - assertTrue(projSet.isProjected("map")); - assertFalse(projSet.isProjected("another")); - ProjectionSet mapProj = projSet.mapProjection("map"); - assertNotNull(mapProj); - assertTrue(mapProj instanceof NullProjectionSet); - assertTrue(mapProj.isProjected("foo")); - assertNotNull(projSet.mapProjection("another")); - assertFalse(projSet.mapProjection("another").isProjected("anyCol")); - } - - // Selected map projection, multiple levels, full projection - // at leaf level. - - { - List<SchemaPath> projCols = new ArrayList<>(); - projCols.add(SchemaPath.getCompoundPath("map", "a")); - projCols.add(SchemaPath.getCompoundPath("map", "b")); - projCols.add(SchemaPath.getCompoundPath("map", "map2", "x")); - ProjectionSet projSet = ProjectionSetImpl.parse(projCols); - assertTrue(projSet instanceof ProjectionSetImpl); - assertTrue(projSet.isProjected("map")); - - // Map: an explicit map at top level - - ProjectionSet mapProj = projSet.mapProjection("map"); - assertTrue(mapProj instanceof ProjectionSetImpl); - assertTrue(mapProj.isProjected("a")); - assertTrue(mapProj.isProjected("b")); - assertTrue(mapProj.isProjected("map2")); - assertFalse(projSet.isProjected("bogus")); - - // Map b: an implied nested map - - ProjectionSet bMapProj = mapProj.mapProjection("b"); - assertNotNull(bMapProj); - assertTrue(bMapProj instanceof NullProjectionSet); - assertTrue(bMapProj.isProjected("foo")); - - // Map2, an nested map, has an explicit projection - - ProjectionSet map2Proj = mapProj.mapProjection("map2"); - assertNotNull(map2Proj); - assertTrue(map2Proj instanceof ProjectionSetImpl); - assertTrue(map2Proj.isProjected("x")); - assertFalse(map2Proj.isProjected("bogus")); - } - } - /** * Test imposing a selection mask between the client and the underlying * vector container. @@ -148,10 +56,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { @Test public void testProjectionStatic() { - List<SchemaPath> selection = Lists.newArrayList( - SchemaPath.getSimplePath("c"), - SchemaPath.getSimplePath("b"), - SchemaPath.getSimplePath("e")); + List<SchemaPath> selection = RowSetTestUtils.projectList("c", "b", "e"); TupleMetadata schema = new SchemaBuilder() .add("a", MinorType.INT) .add("b", MinorType.INT) @@ -169,10 +74,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { @Test public void testProjectionDynamic() { - List<SchemaPath> selection = Lists.newArrayList( - SchemaPath.getSimplePath("c"), - SchemaPath.getSimplePath("b"), - SchemaPath.getSimplePath("e")); + List<SchemaPath> selection = RowSetTestUtils.projectList("c", "b", "e"); ResultSetOptions options = new OptionBuilder() .setProjection(selection) .build(); @@ -241,9 +143,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { @Test public void testMapProjection() { - List<SchemaPath> selection = Lists.newArrayList( - SchemaPath.getSimplePath("m1"), - SchemaPath.getCompoundPath("m2", "d")); + List<SchemaPath> selection = RowSetTestUtils.projectList("m1", "m2.d"); TupleMetadata schema = new SchemaBuilder() .addMap("m1") .add("a", MinorType.INT) @@ -293,22 +193,9 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { rsLoader.startBatch(); rootWriter.start(); - rootWriter.tuple("m1").scalar("a").setInt(1); - rootWriter.tuple("m1").scalar("b").setInt(2); - rootWriter.tuple("m2").scalar("c").setInt(3); - rootWriter.tuple("m2").scalar("d").setInt(4); - rootWriter.tuple("m3").scalar("e").setInt(5); - rootWriter.tuple("m3").scalar("f").setInt(6); - rootWriter.save(); - - rootWriter.start(); - rootWriter.tuple("m1").scalar("a").setInt(11); - rootWriter.tuple("m1").scalar("b").setInt(12); - rootWriter.tuple("m2").scalar("c").setInt(13); - rootWriter.tuple("m2").scalar("d").setInt(14); - rootWriter.tuple("m3").scalar("e").setInt(15); - rootWriter.tuple("m3").scalar("f").setInt(16); - rootWriter.save(); + rootWriter + .addRow(mapValue( 1, 2), mapValue( 3, 4), mapValue( 5, 6)) + .addRow(mapValue(11, 12), mapValue(13, 14), mapValue(15, 16)); // Verify. Only the projected columns appear in the result set. @@ -322,8 +209,8 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { .resumeSchema() .build(); SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(objArray(1, 2), objArray(4)) - .addRow(objArray(11, 12), objArray(14)) + .addRow(mapValue( 1, 2), mapValue( 4)) + .addRow(mapValue(11, 12), mapValue(14)) .build(); new RowSetComparison(expected) .verifyAndClearAll(fixture.wrap(rsLoader.harvest())); @@ -338,9 +225,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { @Test public void testMapArrayProjection() { - List<SchemaPath> selection = Lists.newArrayList( - SchemaPath.getSimplePath("m1"), - SchemaPath.getCompoundPath("m2", "d")); + List<SchemaPath> selection = RowSetTestUtils.projectList("m1", "m2.d"); TupleMetadata schema = new SchemaBuilder() .addMapArray("m1") .add("a", MinorType.INT) @@ -408,9 +293,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest { @Test public void testProjectWithOverflow() { - List<SchemaPath> selection = Lists.newArrayList( - SchemaPath.getSimplePath("small"), - SchemaPath.getSimplePath("dummy")); + List<SchemaPath> selection = RowSetTestUtils.projectList("small", "dummy"); TupleMetadata schema = new SchemaBuilder() .add("big", MinorType.VARCHAR) .add("small", MinorType.VARCHAR)