http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java index 030f95a..e1e18dc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java @@ -17,23 +17,13 @@ */ package org.apache.drill.test.rowSet; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; +import org.apache.drill.exec.physical.rowSet.model.single.BaseReaderBuilder; +import org.apache.drill.exec.record.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader; -import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema; -import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn; -import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema; /** * Base class for row sets backed by a single record batch. @@ -41,151 +31,27 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema; public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet { - /** - * Internal helper class to organize a set of value vectors for use by the - * row set class. Subclasses either build vectors from a schema, or map an - * existing vector container into the row set structure. The row set - * structure is based on a flattened structure; all vectors appear in - * a single vector array. Maps are set aside in a separate map list. - */ - - public abstract static class StructureBuilder { - protected final PhysicalSchema schema; - protected final BufferAllocator allocator; - protected final ValueVector[] valueVectors; - protected final MapVector[] mapVectors; - protected int vectorIndex; - protected int mapIndex; - - public StructureBuilder(BufferAllocator allocator, RowSetSchema schema) { - this.allocator = allocator; - this.schema = schema.physical(); - FlattenedSchema flatSchema = schema.flatAccess(); - valueVectors = new ValueVector[flatSchema.count()]; - if (flatSchema.mapCount() == 0) { - mapVectors = null; - } else { - mapVectors = new MapVector[flatSchema.mapCount()]; - } - } - } - - /** - * Create a set of value vectors given a schema, then map them into both - * the value container and the row set structure. - */ - - public static class VectorBuilder extends StructureBuilder { - - public VectorBuilder(BufferAllocator allocator, RowSetSchema schema) { - super(allocator, schema); - } - - public ValueVector[] buildContainer(VectorContainer container) { - for (int i = 0; i < schema.count(); i++) { - LogicalColumn colSchema = schema.column(i); - @SuppressWarnings("resource") - ValueVector v = TypeHelper.getNewVector(colSchema.field, allocator, null); - container.add(v); - if (colSchema.field.getType().getMinorType() == MinorType.MAP) { - MapVector mv = (MapVector) v; - mapVectors[mapIndex++] = mv; - buildMap(mv, colSchema.mapSchema); - } else { - valueVectors[vectorIndex++] = v; - } - } - container.buildSchema(SelectionVectorMode.NONE); - return valueVectors; - } - - private void buildMap(MapVector mapVector, PhysicalSchema mapSchema) { - for (int i = 0; i < mapSchema.count(); i++) { - LogicalColumn colSchema = mapSchema.column(i); - MajorType type = colSchema.field.getType(); - Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()); - @SuppressWarnings("resource") - ValueVector v = mapVector.addOrGet(colSchema.field.getName(), type, vectorClass); - if (type.getMinorType() == MinorType.MAP) { - MapVector mv = (MapVector) v; - mapVectors[mapIndex++] = mv; - buildMap(mv, colSchema.mapSchema); - } else { - valueVectors[vectorIndex++] = v; - } - } - } - } - - /** - * Build a row set given an existing vector container. In this case, - * the vectors exist and we simply need to pull them out of the container - * and maps and put them into the row set arrays. - */ - - public static class VectorMapper extends StructureBuilder { - - public VectorMapper(BufferAllocator allocator, RowSetSchema schema) { - super(allocator, schema); - } + public static class RowSetReaderBuilder extends BaseReaderBuilder { - public ValueVector[] mapContainer(VectorContainer container) { - for (VectorWrapper<?> w : container) { - @SuppressWarnings("resource") - ValueVector v = w.getValueVector(); - if (v.getField().getType().getMinorType() == MinorType.MAP) { - MapVector mv = (MapVector) v; - mapVectors[mapIndex++] = mv; - buildMap(mv); - } else { - valueVectors[vectorIndex++] = v; - } - } - return valueVectors; + public RowSetReader buildReader(AbstractSingleRowSet rowSet, ReaderIndex rowIndex) { + TupleMetadata schema = rowSet.schema(); + return new RowSetReaderImpl(schema, rowIndex, + buildContainerChildren(rowSet.container(), + new MetadataRetrieval(schema))); } - - private void buildMap(MapVector mapVector) { - for (ValueVector v : mapVector) { - if (v.getField().getType().getMinorType() == MinorType.MAP) { - MapVector mv = (MapVector) v; - mapVectors[mapIndex++] = mv; - buildMap(mv); - } else { - valueVectors[vectorIndex++] = v; - } - } - } - } - - /** - * Flattened representation of value vectors using a depth-first - * traversal of maps. Order of vectors here correspond to the column - * indexes used to access columns in a reader or writer. - */ - - protected final ValueVector[] valueVectors; - - public AbstractSingleRowSet(BufferAllocator allocator, BatchSchema schema) { - super(allocator, schema, new VectorContainer()); - valueVectors = new VectorBuilder(allocator, super.schema).buildContainer(container); - } - - public AbstractSingleRowSet(BufferAllocator allocator, VectorContainer container) { - super(allocator, container.getSchema(), container); - valueVectors = new VectorMapper(allocator, super.schema).mapContainer(container); } public AbstractSingleRowSet(AbstractSingleRowSet rowSet) { - super(rowSet.allocator, rowSet.schema.batch(), rowSet.container); - valueVectors = rowSet.valueVectors; + super(rowSet.container, rowSet.schema); } - @Override - public ValueVector[] vectors() { return valueVectors; } + public AbstractSingleRowSet(VectorContainer container, TupleMetadata schema) { + super(container, schema); + } @Override public long size() { - RecordBatchSizer sizer = new RecordBatchSizer(container); + RecordBatchSizer sizer = new RecordBatchSizer(container()); return sizer.actualSize(); } @@ -197,21 +63,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin * (non-map) vectors. */ - protected RowSetReader buildReader(RowSetIndex rowIndex) { - FlattenedSchema accessSchema = schema().flatAccess(); - ValueVector[] valueVectors = vectors(); - AbstractColumnReader[] readers = new AbstractColumnReader[valueVectors.length]; - for (int i = 0; i < readers.length; i++) { - MinorType type = accessSchema.column(i).getType().getMinorType(); - if (type == MinorType.MAP) { - readers[i] = null; // buildMapAccessor(i); - } else if (type == MinorType.LIST) { - readers[i] = null; // buildListAccessor(i); - } else { - readers[i] = ColumnAccessorFactory.newReader(valueVectors[i].getField().getType()); - readers[i].bind(rowIndex, valueVectors[i]); - } - } - return new RowSetReaderImpl(accessSchema, rowIndex, readers); + protected RowSetReader buildReader(ReaderIndex rowIndex) { + return new RowSetReaderBuilder().buildReader(this, rowIndex); } }
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java index 29a1702..5972f05 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java @@ -18,19 +18,21 @@ package org.apache.drill.test.rowSet; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; +import org.apache.drill.exec.physical.rowSet.model.SchemaInference; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; +import org.apache.drill.exec.physical.rowSet.model.single.BaseWriterBuilder; +import org.apache.drill.exec.physical.rowSet.model.single.BuildVectorsFromMetadata; +import org.apache.drill.exec.physical.rowSet.model.single.VectorAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema; -import org.apache.drill.exec.vector.accessor.impl.AbstractColumnWriter; -import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory; -import org.apache.drill.exec.vector.accessor.impl.TupleWriterImpl; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; +import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl; /** * Implementation of a single row set with no indirection (selection) @@ -46,118 +48,54 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS * the first. (This is the JDBC RecordSet convention.) */ - private static class DirectRowIndex extends BoundedRowIndex { + private static class DirectRowIndex extends ReaderIndex { public DirectRowIndex(int rowCount) { super(rowCount); } @Override - public int index() { return rowIndex; } + public int vectorIndex() { return rowIndex; } @Override - public int batch() { return 0; } + public int batchIndex() { return 0; } } - /** - * Writer index that points to each row in the row set. The index starts at - * the 0th row and advances one row on each increment. This allows writers to - * start positioned at the first row. Writes happen in the current row. - * Calling <tt>next()</tt> advances to the next position, effectively saving - * the current row. The most recent row can be abandoned easily simply by not - * calling <tt>next()</tt>. This means that the number of completed rows is - * the same as the row index. - */ - - private static class ExtendableRowIndex extends RowSetIndex { - - private final int maxSize; - - public ExtendableRowIndex(int maxSize) { - this.maxSize = maxSize; - rowIndex = 0; - } + public static class RowSetWriterBuilder extends BaseWriterBuilder { - @Override - public int index() { return rowIndex; } - - @Override - public boolean next() { - if (++rowIndex <= maxSize ) { - return true; - } else { - rowIndex--; - return false; - } + public RowSetWriter buildWriter(DirectRowSet rowSet) { + WriterIndexImpl index = new WriterIndexImpl(); + TupleMetadata schema = rowSet.schema(); + RowSetWriterImpl writer = new RowSetWriterImpl(rowSet, schema, index, + buildContainerChildren(rowSet.container(), + new MetadataRetrieval(schema))); + return writer; } - - @Override - public int size() { return rowIndex; } - - @Override - public boolean valid() { return rowIndex < maxSize; } - - @Override - public int batch() { return 0; } } - /** - * Implementation of a row set writer. Only available for newly-created, - * empty, direct, single row sets. Rewriting is not allowed, nor is writing - * to a hyper row set. - */ - - public class RowSetWriterImpl extends TupleWriterImpl implements RowSetWriter { - - private final ExtendableRowIndex index; - private final ExtendableRowSet rowSet; - - protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleSchema schema, ExtendableRowIndex index, AbstractColumnWriter[] writers) { - super(schema, writers); - this.rowSet = rowSet; - this.index = index; - start(); - } - - @Override - public void setRow(Object...values) { - if (! index.valid()) { - throw new IndexOutOfBoundsException("Write past end of row set"); - } - for (int i = 0; i < values.length; i++) { - set(i, values[i]); - } - save(); - } - - @Override - public boolean valid() { return index.valid(); } - - @Override - public int index() { return index.position(); } + private DirectRowSet(VectorContainer container, TupleMetadata schema) { + super(container, schema); + } - @Override - public void save() { - index.next(); - start(); - } + public DirectRowSet(AbstractSingleRowSet from) { + super(from); + } - @Override - public void done() { - rowSet.setRowCount(index.size()); - } + public static DirectRowSet fromSchema(BufferAllocator allocator, BatchSchema schema) { + return fromSchema(allocator, TupleSchema.fromFields(schema)); } - public DirectRowSet(BufferAllocator allocator, BatchSchema schema) { - super(allocator, schema); + public static DirectRowSet fromSchema(BufferAllocator allocator, TupleMetadata schema) { + BuildVectorsFromMetadata builder = new BuildVectorsFromMetadata(allocator); + return new DirectRowSet(builder.build(schema), schema); } - public DirectRowSet(BufferAllocator allocator, VectorContainer container) { - super(allocator, container); + public static DirectRowSet fromContainer(VectorContainer container) { + return new DirectRowSet(container, new SchemaInference().infer(container)); } - public DirectRowSet(BufferAllocator allocator, VectorAccessible va) { - super(allocator, toContainer(va, allocator)); + public static DirectRowSet fromVectorAccessible(BufferAllocator allocator, VectorAccessible va) { + return fromContainer(toContainer(va, allocator)); } private static VectorContainer toContainer(VectorAccessible va, BufferAllocator allocator) { @@ -168,16 +106,8 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS } @Override - public void allocate(int recordCount) { - for (final ValueVector v : valueVectors) { - AllocationHelper.allocate(v, recordCount, 50, 10); - } - } - - @Override - public void setRowCount(int rowCount) { - container.setRecordCount(rowCount); - VectorAccessibleUtilities.setValueCount(container, rowCount); + public void allocate(int rowCount) { + new VectorAllocator(container()).allocate(rowCount, schema()); } @Override @@ -187,29 +117,11 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS @Override public RowSetWriter writer(int initialRowCount) { - if (container.hasRecordCount()) { + if (container().hasRecordCount()) { throw new IllegalStateException("Row set already contains data"); } allocate(initialRowCount); - return buildWriter(new ExtendableRowIndex(Character.MAX_VALUE)); - } - - /** - * Build writer objects for each column based on the column type. - * - * @param rowIndex the index which points to each row - * @return an array of writers - */ - - protected RowSetWriter buildWriter(ExtendableRowIndex rowIndex) { - ValueVector[] valueVectors = vectors(); - AbstractColumnWriter[] writers = new AbstractColumnWriter[valueVectors.length]; - for (int i = 0; i < writers.length; i++) { - writers[i] = ColumnAccessorFactory.newWriter(valueVectors[i].getField().getType()); - writers[i].bind(rowIndex, valueVectors[i]); - } - TupleSchema accessSchema = schema().hierarchicalAccess(); - return new RowSetWriterImpl(this, accessSchema, rowIndex, writers); + return new RowSetWriterBuilder().buildWriter(this); } @Override @@ -233,9 +145,4 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS @Override public SelectionVector2 getSv2() { return null; } - - @Override - public RowSet merge(RowSet other) { - return new DirectRowSet(allocator, container().merge(other.container())); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java index afc2e6e..8a3db9f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java @@ -17,27 +17,14 @@ */ package org.apache.drill.test.rowSet; -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; +import org.apache.drill.exec.physical.rowSet.model.SchemaInference; +import org.apache.drill.exec.physical.rowSet.model.hyper.BaseReaderBuilder; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.HyperVectorWrapper; -import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.AccessorUtilities; -import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader; -import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader.VectorAccessor; -import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory; -import org.apache.drill.exec.vector.complex.AbstractMapVector; import org.apache.drill.test.rowSet.RowSet.HyperRowSet; -import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema; -import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn; -import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema; /** * Implements a row set wrapper around a collection of "hyper vectors." @@ -52,176 +39,14 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema; public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet { - /** - * Read-only row index into the hyper row set with batch and index - * values mapping via an SV4. - */ - - public static class HyperRowIndex extends BoundedRowIndex { - - private final SelectionVector4 sv4; - - public HyperRowIndex(SelectionVector4 sv4) { - super(sv4.getCount()); - this.sv4 = sv4; - } - - @Override - public int index() { - return AccessorUtilities.sv4Index(sv4.get(rowIndex)); - } - - @Override - public int batch( ) { - return AccessorUtilities.sv4Batch(sv4.get(rowIndex)); - } - } - - /** - * Vector accessor used by the column accessors to obtain the vector for - * each column value. That is, position 0 might be batch 4, index 3, - * while position 1 might be batch 1, index 7, and so on. - */ - - public static class HyperVectorAccessor implements VectorAccessor { + public static class RowSetReaderBuilder extends BaseReaderBuilder { - private final HyperRowIndex rowIndex; - private final ValueVector[] vectors; - - public HyperVectorAccessor(HyperVectorWrapper<ValueVector> hvw, HyperRowIndex rowIndex) { - this.rowIndex = rowIndex; - vectors = hvw.getValueVectors(); - } - - @Override - public ValueVector vector() { - return vectors[rowIndex.batch()]; - } - } - - /** - * Build a hyper row set by restructuring a hyper vector bundle into a uniform - * shape. Consider this schema: <pre><code> - * { a: 10, b: { c: 20, d: { e: 30 } } }</code></pre> - * <p> - * The hyper container, with two batches, has this structure: - * <table border="1"> - * <tr><th>Batch</th><th>a</th><th>b</th></tr> - * <tr><td>0</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th> - * <tr><td>1</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th> - * </table> - * <p> - * The above table shows that top-level scalar vectors (such as the Int Vector for column - * a) appear "end-to-end" as a hyper-vector. Maps also appear end-to-end. But, the - * contents of the map (column c) do not appear end-to-end. Instead, they appear as - * contents in the map vector. To get to c, one indexes into the map vector, steps inside - * the map to find c and indexes to the right row. - * <p> - * Similarly, the maps for d do not appear end-to-end, one must step to the right batch - * in b, then step to d. - * <p> - * Finally, to get to e, one must step - * into the hyper vector for b, then steps to the proper batch, steps to d, step to e - * and finally step to the row within e. This is a very complex, costly indexing scheme - * that differs depending on map nesting depth. - * <p> - * To simplify access, this class restructures the maps to flatten the scalar vectors - * into end-to-end hyper vectors. For example, for the above: - * <p> - * <table border="1"> - * <tr><th>Batch</th><th>a</th><th>c</th><th>d</th></tr> - * <tr><td>0</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th> - * <tr><td>1</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th> - * </table> - * - * The maps are still available as hyper vectors, but separated into map fields. - * (Scalar access no longer needs to access the maps.) The result is a uniform - * addressing scheme for both top-level and nested vectors. - */ - - public static class HyperVectorBuilder { - - protected final HyperVectorWrapper<?> valueVectors[]; - protected final HyperVectorWrapper<AbstractMapVector> mapVectors[]; - private final List<ValueVector> nestedScalars[]; - private int vectorIndex; - private int mapIndex; - private final PhysicalSchema physicalSchema; - - @SuppressWarnings("unchecked") - public HyperVectorBuilder(RowSetSchema schema) { - physicalSchema = schema.physical(); - FlattenedSchema flatSchema = schema.flatAccess(); - valueVectors = new HyperVectorWrapper<?>[schema.hierarchicalAccess().count()]; - if (flatSchema.mapCount() == 0) { - mapVectors = null; - nestedScalars = null; - } else { - mapVectors = (HyperVectorWrapper<AbstractMapVector>[]) - new HyperVectorWrapper<?>[flatSchema.mapCount()]; - nestedScalars = new ArrayList[flatSchema.count()]; - } - } - - @SuppressWarnings("unchecked") - public HyperVectorWrapper<ValueVector>[] mapContainer(VectorContainer container) { - int i = 0; - for (VectorWrapper<?> w : container) { - HyperVectorWrapper<?> hvw = (HyperVectorWrapper<?>) w; - if (w.getField().getType().getMinorType() == MinorType.MAP) { - HyperVectorWrapper<AbstractMapVector> mw = (HyperVectorWrapper<AbstractMapVector>) hvw; - mapVectors[mapIndex++] = mw; - buildHyperMap(physicalSchema.column(i).mapSchema(), mw); - } else { - valueVectors[vectorIndex++] = hvw; - } - i++; - } - if (nestedScalars != null) { - buildNestedHyperVectors(); - } - return (HyperVectorWrapper<ValueVector>[]) valueVectors; - } - - private void buildHyperMap(PhysicalSchema mapSchema, HyperVectorWrapper<AbstractMapVector> mapWrapper) { - createHyperVectors(mapSchema); - for (AbstractMapVector mapVector : mapWrapper.getValueVectors()) { - buildMap(mapSchema, mapVector); - } - } - - private void buildMap(PhysicalSchema mapSchema, AbstractMapVector mapVector) { - for (ValueVector v : mapVector) { - LogicalColumn col = mapSchema.column(v.getField().getName()); - if (col.isMap()) { - buildMap(col.mapSchema, (AbstractMapVector) v); - } else { - nestedScalars[col.accessIndex()].add(v); - } - } - } - - private void createHyperVectors(PhysicalSchema mapSchema) { - for (int i = 0; i < mapSchema.count(); i++) { - LogicalColumn col = mapSchema.column(i); - if (col.isMap()) { - createHyperVectors(col.mapSchema); - } else { - nestedScalars[col.accessIndex()] = new ArrayList<ValueVector>(); - } - } - } - - private void buildNestedHyperVectors() { - for (int i = 0; i < nestedScalars.length; i++) { - if (nestedScalars[i] == null) { - continue; - } - ValueVector vectors[] = new ValueVector[nestedScalars[i].size()]; - nestedScalars[i].toArray(vectors); - assert valueVectors[i] == null; - valueVectors[i] = new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors, false); - } + public RowSetReader buildReader(HyperRowSet rowSet, SelectionVector4 sv4) { + TupleMetadata schema = rowSet.schema(); + HyperRowIndex rowIndex = new HyperRowIndex(sv4); + return new RowSetReaderImpl(schema, rowIndex, + buildContainerChildren(rowSet.container(), + new MetadataRetrieval(schema))); } } @@ -231,18 +56,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet { private final SelectionVector4 sv4; - /** - * Collection of hyper vectors in flattened order: a left-to-right, - * depth first ordering of vectors in maps. Order here corresponds to - * the order used for column indexes in the row set reader. - */ - - private final HyperVectorWrapper<ValueVector> hvw[]; - - public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) { - super(allocator, container.getSchema(), container); + public HyperRowSetImpl(VectorContainer container, SelectionVector4 sv4) { + super(container, new SchemaInference().infer(container)); this.sv4 = sv4; - hvw = new HyperVectorBuilder(schema).mapContainer(container); } @Override @@ -252,33 +68,8 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet { public boolean isWritable() { return false; } @Override - public RowSetWriter writer() { - throw new UnsupportedOperationException("Cannot write to a hyper vector"); - } - - @Override public RowSetReader reader() { - return buildReader(new HyperRowIndex(sv4)); - } - - /** - * Internal method to build the set of column readers needed for - * this row set. Used when building a row set reader. - * @param rowIndex object that points to the current row - * @return an array of column readers: in the same order as the - * (non-map) vectors. - */ - - protected RowSetReader buildReader(HyperRowIndex rowIndex) { - FlattenedSchema accessSchema = schema().flatAccess(); - AbstractColumnReader readers[] = new AbstractColumnReader[accessSchema.count()]; - for (int i = 0; i < readers.length; i++) { - MaterializedField field = accessSchema.column(i); - readers[i] = ColumnAccessorFactory.newReader(field.getType()); - HyperVectorWrapper<ValueVector> hvw = getHyperVector(i); - readers[i].bind(rowIndex, field, new HyperVectorAccessor(hvw, rowIndex)); - } - return new RowSetReaderImpl(accessSchema, rowIndex, readers); + return new RowSetReaderBuilder().buildReader(this, sv4); } @Override @@ -288,13 +79,5 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet { public SelectionVector4 getSv4() { return sv4; } @Override - public HyperVectorWrapper<ValueVector> getHyperVector(int i) { return hvw[i]; } - - @Override public int rowCount() { return sv4.getCount(); } - - @Override - public RowSet merge(RowSet other) { - return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index 1914705..e729bba 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -20,6 +20,8 @@ package org.apache.drill.test.rowSet; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; +import org.apache.drill.exec.physical.rowSet.model.SchemaInference; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -33,14 +35,14 @@ public class IndirectRowSet extends AbstractSingleRowSet { /** * Reader index that points to each row indirectly through the - * selection vector. The {@link #index()} method points to the + * selection vector. The {@link #vectorIndex()} method points to the * actual data row, while the {@link #position()} method gives * the position relative to the indirection vector. That is, * the position increases monotonically, but the index jumps * around as specified by the indirection vector. */ - private static class IndirectRowIndex extends BoundedRowIndex { + private static class IndirectRowIndex extends ReaderIndex { private final SelectionVector2 sv2; @@ -50,21 +52,25 @@ public class IndirectRowSet extends AbstractSingleRowSet { } @Override - public int index() { return sv2.getIndex(rowIndex); } + public int vectorIndex() { return sv2.getIndex(rowIndex); } @Override - public int batch() { return 0; } + public int batchIndex() { return 0; } } private final SelectionVector2 sv2; - public IndirectRowSet(BufferAllocator allocator, VectorContainer container) { - this(allocator, container, makeSv2(allocator, container)); + private IndirectRowSet(VectorContainer container, SelectionVector2 sv2) { + super(container, new SchemaInference().infer(container)); + this.sv2 = sv2; } - public IndirectRowSet(BufferAllocator allocator, VectorContainer container, SelectionVector2 sv2) { - super(allocator, container); - this.sv2 = sv2; + public static IndirectRowSet fromContainer(VectorContainer container) { + return new IndirectRowSet(container, makeSv2(container.getAllocator(), container)); + } + + public static IndirectRowSet fromSv2(VectorContainer container, SelectionVector2 sv2) { + return new IndirectRowSet(container, sv2); } private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) { @@ -83,7 +89,7 @@ public class IndirectRowSet extends AbstractSingleRowSet { public IndirectRowSet(DirectRowSet directRowSet) { super(directRowSet); - sv2 = makeSv2(allocator, container); + sv2 = makeSv2(allocator(), container()); } @Override @@ -96,11 +102,6 @@ public class IndirectRowSet extends AbstractSingleRowSet { } @Override - public RowSetWriter writer() { - throw new UnsupportedOperationException("Cannot write to an existing row set"); - } - - @Override public RowSetReader reader() { return buildReader(new IndirectRowIndex(getSv2())); } @@ -119,12 +120,7 @@ public class IndirectRowSet extends AbstractSingleRowSet { @Override public long size() { - RecordBatchSizer sizer = new RecordBatchSizer(container, sv2); + RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2); return sizer.actualSize(); } - - @Override - public RowSet merge(RowSet other) { - return new IndirectRowSet(allocator, container().merge(other.container()), sv2); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java index 474508c..f2435de 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java @@ -20,25 +20,24 @@ package org.apache.drill.test.rowSet; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.HyperVectorWrapper; +import org.apache.drill.exec.record.TupleMetadata; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.ColumnReader; -import org.apache.drill.exec.vector.accessor.ColumnWriter; -import org.apache.drill.exec.vector.accessor.TupleReader; -import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.accessor.ScalarReader; +import org.apache.parquet.column.ColumnWriter; /** * A row set is a collection of rows stored as value vectors. Elsewhere in * Drill we call this a "record batch", but that term has been overloaded to - * mean the runtime implementation of an operator... + * mean the runtime implementation of an operator. * <p> * A row set encapsulates a set of vectors and provides access to Drill's * various "views" of vectors: {@link VectorContainer}, - * {@link VectorAccessible}, etc. + * {@link VectorAccessible}, etc. The row set wraps a {#link TupleModel} + * which holds the vectors and column metadata. This form is optimized + * for easy use in testing; use other implementations for production code. * <p> * A row set is defined by a {@link RowSetSchema}. For testing purposes, a row * set has a fixed schema; we don't allow changing the set of vectors @@ -52,7 +51,7 @@ import org.apache.drill.exec.vector.accessor.TupleWriter; * Drill provides a large number of vector (data) types. Each requires a * type-specific way to set data. The row set writer uses a {@link ColumnWriter} * to set each value in a way unique to the specific data type. Similarly, the - * row set reader provides a {@link ColumnReader} interface. In both cases, + * row set reader provides a {@link ScalarReader} interface. In both cases, * columns can be accessed by index number (as defined in the schema) or * by name. * <p> @@ -78,54 +77,6 @@ import org.apache.drill.exec.vector.accessor.TupleWriter; public interface RowSet { - /** - * Interface for writing values to a row set. Only available - * for newly-created, single, direct row sets. Eventually, if - * we want to allow updating a row set, we have to create a - * new row set with the updated columns, then merge the new - * and old row sets to create a new immutable row set. - */ - interface RowSetWriter extends TupleWriter { - void setRow(Object...values); - boolean valid(); - int index(); - void save(); - void done(); - } - - /** - * Reader for all types of row sets. - */ - interface RowSetReader extends TupleReader { - - /** - * Total number of rows in the row set. - * @return total number of rows - */ - int size(); - - boolean next(); - int index(); - void set(int index); - - /** - * Batch index: 0 for a single batch, batch for the current - * row is a hyper-batch. - * @return index of the batch for the current row - */ - int batchIndex(); - - /** - * The index of the underlying row which may be indexed by an - * Sv2 or Sv4. - * - * @return - */ - - int rowIndex(); - boolean valid(); - } - boolean isExtendable(); boolean isWritable(); @@ -136,13 +87,11 @@ public interface RowSet { int rowCount(); - RowSetWriter writer(); - RowSetReader reader(); void clear(); - RowSetSchema schema(); + TupleMetadata schema(); BufferAllocator allocator(); @@ -157,17 +106,16 @@ public interface RowSet { * * @return memory size in bytes */ - long size(); - RowSet merge(RowSet other); + long size(); BatchSchema batchSchema(); /** * Row set that manages a single batch of rows. */ - interface SingleRowSet extends RowSet { - ValueVector[] vectors(); + + public interface SingleRowSet extends RowSet { SingleRowSet toIndirect(); SelectionVector2 getSv2(); } @@ -177,9 +125,10 @@ public interface RowSet { * Once writing is complete, the row set becomes an * immutable direct row set. */ + interface ExtendableRowSet extends SingleRowSet { void allocate(int recordCount); - void setRowCount(int rowCount); + RowSetWriter writer(); RowSetWriter writer(int initialRowCount); } @@ -187,8 +136,8 @@ public interface RowSet { * Row set comprised of multiple single row sets, along with * an indirection vector (SV4). */ + interface HyperRowSet extends RowSet { SelectionVector4 getSv4(); - HyperVectorWrapper<ValueVector> getHyperVector(int i); } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java index 6f9a8d9..7b1554c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java @@ -19,7 +19,10 @@ package org.apache.drill.test.rowSet; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; /** @@ -40,14 +43,20 @@ public final class RowSetBuilder { private boolean withSv2; public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) { + this(allocator, TupleSchema.fromFields(schema), 10); + } + + public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema) { this(allocator, schema, 10); } - public RowSetBuilder(BufferAllocator allocator, BatchSchema schema, int capacity) { - rowSet = new DirectRowSet(allocator, schema); + public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema, int capacity) { + rowSet = DirectRowSet.fromSchema(allocator, schema); writer = rowSet.writer(capacity); } + public TupleWriter writer() { return writer; } + /** * Add a new row using column values passed as variable-length arguments. Expects * map values to be flattened. a schema of (a:int, b:map(c:varchar)) would be> @@ -56,17 +65,18 @@ public final class RowSetBuilder { * <tt>add(10, new int[] {100, 200});</tt><br> * @param values column values in column index order * @return this builder - * @see {@link #addSingleCol(Object)} to create a row of a single column when - * the value to <tt>add()</tt> is ambiguous + * @throws IllegalStateException if the batch, or any vector in the batch, + * becomes full. This method is designed to be used in tests where we will + * seldom create a full vector of data. */ - public RowSetBuilder add(Object...values) { + public RowSetBuilder addRow(Object...values) { writer.setRow(values); return this; } /** - * The {@link #add(Object...)} method uses Java variable-length arguments to + * The {@link #addRow(Object...)} method uses Java variable-length arguments to * pass a row of values. But, when the row consists of a single array, Java * gets confused: is that an array for variable-arguments or is it the value * of the first argument? This method clearly states that the single value @@ -93,7 +103,7 @@ public final class RowSetBuilder { */ public RowSetBuilder addSingleCol(Object value) { - return add(new Object[] { value }); + return addRow(new Object[] { value }); } /** @@ -110,10 +120,10 @@ public final class RowSetBuilder { } public SingleRowSet build() { - writer.done(); + SingleRowSet result = writer.done(); if (withSv2) { - return rowSet.toIndirect(); + return result.toIndirect(); } - return rowSet; + return result; } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java index 6e72923..1cae64f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java @@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.apache.drill.exec.vector.accessor.ArrayReader; -import org.apache.drill.exec.vector.accessor.ColumnReader; -import org.apache.drill.test.rowSet.RowSet.RowSetReader; +import org.apache.drill.exec.vector.accessor.ObjectReader; +import org.apache.drill.exec.vector.accessor.ScalarElementReader; +import org.apache.drill.exec.vector.accessor.ScalarReader; +import org.apache.drill.exec.vector.accessor.TupleReader; import org.bouncycastle.util.Arrays; import java.util.Comparator; @@ -31,19 +33,48 @@ import java.util.Comparator; * For testing, compare the contents of two row sets (record batches) * to verify that they are identical. Supports masks to exclude certain * columns from comparison. + * <p> + * Drill rows are analogous to JSON documents: they can have scalars, + * arrays and maps, with maps and lists holding maps, arrays and scalars. + * This class walks the row structure tree to compare each structure + * of two row sets checking counts, types and values to ensure that the + * "actual" result set (result of a test) matches the "expected" result + * set. + * <p> + * This class acts as an example of how to use the suite of reader + * abstractions. */ public class RowSetComparison { + /** + * Row set with the expected outcome of a test. This is the "golden" + * copy defined in the test itself. + */ private RowSet expected; + /** + * Some tests wish to ignore certain (top-level) columns. If a + * mask is provided, then only those columns with a <tt>true</tt> + * will be verified. + */ private boolean mask[]; + /** + * Floats and doubles do not compare exactly. This delta is used + * by JUnit for such comparisons. + */ private double delta = 0.001; + /** + * Tests can skip the first n rows. + */ private int offset; private int span = -1; public RowSetComparison(RowSet expected) { this.expected = expected; - mask = new boolean[expected.schema().hierarchicalAccess().count()]; + + // TODO: The mask only works at the top level presently + + mask = new boolean[expected.schema().size()]; for (int i = 0; i < mask.length; i++) { mask[i] = true; } @@ -134,7 +165,8 @@ public class RowSetComparison { for (int i = 0; i < testLength; i++) { er.next(); ar.next(); - verifyRow(er, ar); + String label = Integer.toString(er.index() + 1); + verifyRow(label, er, ar); } } @@ -167,22 +199,50 @@ public class RowSetComparison { } } - private void verifyRow(RowSetReader er, RowSetReader ar) { + private void verifyRow(String label, TupleReader er, TupleReader ar) { + String prefix = label + ":"; for (int i = 0; i < mask.length; i++) { if (! mask[i]) { continue; } - ColumnReader ec = er.column(i); - ColumnReader ac = ar.column(i); - String label = (er.index() + 1) + ":" + i; - assertEquals(label, ec.valueType(), ac.valueType()); - if (ec.isNull()) { - assertTrue(label + " - column not null", ac.isNull()); - continue; - } - if (! ec.isNull()) { - assertTrue(label + " - column is null", ! ac.isNull()); - } + verifyColumn(prefix + i, er.column(i), ar.column(i)); + } + } + + private void verifyColumn(String label, ObjectReader ec, ObjectReader ac) { + assertEquals(label, ec.type(), ac.type()); + switch (ec.type()) { + case ARRAY: + verifyArray(label, ec.array(), ac.array()); + break; + case SCALAR: + verifyScalar(label, ec.scalar(), ac.scalar()); + break; + case TUPLE: + verifyTuple(label, ec.tuple(), ac.tuple()); + break; + default: + throw new IllegalStateException( "Unexpected type: " + ec.type()); + } + } + + private void verifyTuple(String label, TupleReader er, TupleReader ar) { + assertEquals(label + " - tuple count", er.columnCount(), ar.columnCount()); + String prefix = label + ":"; + for (int i = 0; i < er.columnCount(); i++) { + verifyColumn(prefix + i, er.column(i), ar.column(i)); + } + } + + private void verifyScalar(String label, ScalarReader ec, ScalarReader ac) { + assertEquals(label + " - value type", ec.valueType(), ac.valueType()); + if (ec.isNull()) { + assertTrue(label + " - column not null", ac.isNull()); + return; + } + if (! ec.isNull()) { + assertTrue(label + " - column is null", ! ac.isNull()); + } switch (ec.valueType()) { case BYTES: { byte expected[] = ac.getBytes(); @@ -209,24 +269,42 @@ public class RowSetComparison { case PERIOD: assertEquals(label, ec.getPeriod(), ac.getPeriod()); break; - case ARRAY: - verifyArray(label, ec.array(), ac.array()); - break; default: throw new IllegalStateException( "Unexpected type: " + ec.valueType()); - } } } - private void verifyArray(String colLabel, ArrayReader ea, + private void verifyArray(String label, ArrayReader ea, ArrayReader aa) { + assertEquals(label, ea.entryType(), aa.entryType()); + assertEquals(label, ea.size(), aa.size()); + switch (ea.entryType()) { + case ARRAY: + throw new UnsupportedOperationException(); + case SCALAR: + verifyScalarArray(label, ea.elements(), aa.elements()); + break; + case TUPLE: + verifyTupleArray(label, ea, aa); + break; + default: + throw new IllegalStateException( "Unexpected type: " + ea.entryType()); + } + } + + private void verifyTupleArray(String label, ArrayReader ea, ArrayReader aa) { + for (int i = 0; i < ea.size(); i++) { + verifyTuple(label + "[" + i + "]", ea.tuple(i), aa.tuple(i)); + } + } + + private void verifyScalarArray(String colLabel, ScalarElementReader ea, + ScalarElementReader aa) { assertEquals(colLabel, ea.valueType(), aa.valueType()); assertEquals(colLabel, ea.size(), aa.size()); for (int i = 0; i < ea.size(); i++) { String label = colLabel + "[" + i + "]"; switch (ea.valueType()) { - case ARRAY: - throw new IllegalStateException("Arrays of arrays not supported yet"); case BYTES: { byte expected[] = ea.getBytes(i); byte actual[] = aa.getBytes(i); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java index 42a7e63..e730987 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java @@ -20,8 +20,8 @@ package org.apache.drill.test.rowSet; import java.io.PrintStream; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema; -import org.apache.drill.test.rowSet.RowSet.RowSetReader; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.TupleMetadata; /** * Print a row set in CSV-like format. Primarily for debugging. @@ -41,21 +41,21 @@ public class RowSetPrinter { public void print(PrintStream out) { SelectionVectorMode selectionMode = rowSet.indirectionType(); RowSetReader reader = rowSet.reader(); - int colCount = reader.schema().count(); - printSchema(out, selectionMode); + int colCount = reader.schema().size(); + printSchema(out, selectionMode, reader); while (reader.next()) { printHeader(out, reader, selectionMode); for (int i = 0; i < colCount; i++) { if (i > 0) { out.print(", "); } - out.print(reader.getAsString(i)); + out.print(reader.column(i).getAsString()); } out.println(); } } - private void printSchema(PrintStream out, SelectionVectorMode selectionMode) { + private void printSchema(PrintStream out, SelectionVectorMode selectionMode, RowSetReader reader) { out.print("#"); switch (selectionMode) { case FOUR_BYTE: @@ -68,14 +68,24 @@ public class RowSetPrinter { break; } out.print(": "); - TupleSchema schema = rowSet.schema().hierarchicalAccess(); - for (int i = 0; i < schema.count(); i++) { + TupleMetadata schema = reader.schema(); + printTupleSchema(out, schema); + out.println(); + } + + private void printTupleSchema(PrintStream out, TupleMetadata schema) { + for (int i = 0; i < schema.size(); i++) { if (i > 0) { out.print(", "); } - out.print(schema.column(i).getName()); + ColumnMetadata colSchema = schema.metadata(i); + out.print(colSchema.name()); + if (colSchema.isMap()) { + out.print("("); + printTupleSchema(out, colSchema.mapSchema()); + out.print(")"); + } } - out.println(); } private void printHeader(PrintStream out, RowSetReader reader, SelectionVectorMode selectionMode) { http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java new file mode 100644 index 0000000..3e27529 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test.rowSet; + +import org.apache.drill.exec.vector.accessor.TupleReader; + +/** + * Reader for all types of row sets. + */ + +public interface RowSetReader extends TupleReader { + + /** + * Total number of rows in the row set. + * @return total number of rows + */ + int rowCount(); + + boolean next(); + int index(); + void set(int index); + + /** + * Batch index: 0 for a single batch, batch for the current + * row is a hyper-batch. + * @return index of the batch for the current row + */ + int batchIndex(); + + /** + * The index of the underlying row which may be indexed by an + * Sv2 or Sv4. + * + * @return + */ + + int rowIndex(); + boolean valid(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java new file mode 100644 index 0000000..2bae085 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test.rowSet; + +import java.util.List; + +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; +import org.apache.drill.exec.vector.accessor.reader.AbstractTupleReader; + +/** + * Reader implementation for a row set. + */ + +public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReader { + + protected final ReaderIndex readerIndex; + + public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index, AbstractObjectReader[] readers) { + super(schema, readers); + this.readerIndex = index; + bindIndex(index); + } + + public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index, + List<AbstractObjectReader> readers) { + this(schema, index, + readers.toArray(new AbstractObjectReader[readers.size()])); + } + + @Override + public boolean next() { + if (! readerIndex.next()) { + return false; + } + reposition(); + return true; + } + + @Override + public boolean valid() { return readerIndex.valid(); } + + @Override + public int index() { return readerIndex.position(); } + + @Override + public int rowCount() { return readerIndex.size(); } + + @Override + public int rowIndex() { return readerIndex.vectorIndex(); } + + @Override + public int batchIndex() { return readerIndex.batchIndex(); } + + @Override + public void set(int index) { + this.readerIndex.set(index); + reposition(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java deleted file mode 100644 index 55b5f12..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.test.rowSet; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema; -import org.apache.drill.exec.record.MaterializedField; - -/** - * Row set schema presented as a number of distinct "views" for various - * purposes: - * <ul> - * <li>Batch schema: the schema used by a VectorContainer.</li> - * <li>Physical schema: the schema expressed as a hierarchy of - * tuples with the top tuple representing the row, nested tuples - * representing maps.</li> - * <li>Access schema: a flattened schema with all scalar columns - * at the top level, and with map columns pulled out into a separate - * collection. The flattened-scalar view is the one used to write to, - * and read from, the row set.</li> - * </ul> - * Allows easy creation of multiple row sets from the same schema. - * Each schema is immutable, which is fine for tests in which we - * want known inputs and outputs. - */ - -public class RowSetSchema { - - /** - * Logical description of a column. A logical column is a - * materialized field. For maps, also includes a logical schema - * of the map. - */ - - public static class LogicalColumn { - protected final String fullName; - protected final int accessIndex; - protected int flatIndex; - protected final MaterializedField field; - - /** - * Schema of the map. Includes only those fields directly within - * the map; does not include fields from nested tuples. - */ - - protected PhysicalSchema mapSchema; - - public LogicalColumn(String fullName, int accessIndex, MaterializedField field) { - this.fullName = fullName; - this.accessIndex = accessIndex; - this.field = field; - } - - private void updateStructure(int index, PhysicalSchema children) { - flatIndex = index; - mapSchema = children; - } - - public int accessIndex() { return accessIndex; } - public int flatIndex() { return flatIndex; } - public boolean isMap() { return mapSchema != null; } - public PhysicalSchema mapSchema() { return mapSchema; } - public MaterializedField field() { return field; } - public String fullName() { return fullName; } - } - - /** - * Implementation of a tuple name space. Tuples allow both indexed and - * named access to their members. - * - * @param <T> the type of object representing each column - */ - - public static class NameSpace<T> { - private final Map<String,Integer> nameSpace = new HashMap<>(); - private final List<T> columns = new ArrayList<>(); - - public int add(String key, T value) { - int index = columns.size(); - nameSpace.put(key, index); - columns.add(value); - return index; - } - - public T get(int index) { - return columns.get(index); - } - - public T get(String key) { - int index = getIndex(key); - if (index == -1) { - return null; - } - return get(index); - } - - public int getIndex(String key) { - Integer index = nameSpace.get(key); - if (index == null) { - return -1; - } - return index; - } - - public int count() { return columns.size(); } - } - - /** - * Provides a non-flattened, physical view of the schema. The top-level - * row includes maps, maps expand to a nested tuple schema. This view - * corresponds, more-or-less, to the physical storage of vectors in - * a vector accessible or vector container. - */ - - private static class TupleSchemaImpl implements TupleSchema { - - private NameSpace<LogicalColumn> columns; - - public TupleSchemaImpl(NameSpace<LogicalColumn> ns) { - this.columns = ns; - } - - @Override - public MaterializedField column(int index) { - return logicalColumn(index).field(); - } - - public LogicalColumn logicalColumn(int index) { return columns.get(index); } - - @Override - public MaterializedField column(String name) { - LogicalColumn col = columns.get(name); - return col == null ? null : col.field(); - } - - @Override - public int columnIndex(String name) { - return columns.getIndex(name); - } - - @Override - public int count() { return columns.count(); } - } - - /** - * Represents the flattened view of the schema used to get and set columns. - * Represents a left-to-right, depth-first traversal of the row and map - * columns. Holds only materialized vectors (non-maps). For completeness, - * provides access to maps also via separate methods, but this is generally - * of little use. - */ - - public static class FlattenedSchema extends TupleSchemaImpl { - protected final TupleSchemaImpl maps; - - public FlattenedSchema(NameSpace<LogicalColumn> cols, NameSpace<LogicalColumn> maps) { - super(cols); - this.maps = new TupleSchemaImpl(maps); - } - - public LogicalColumn logicalMap(int index) { return maps.logicalColumn(index); } - public MaterializedField map(int index) { return maps.column(index); } - public MaterializedField map(String name) { return maps.column(name); } - public int mapIndex(String name) { return maps.columnIndex(name); } - public int mapCount() { return maps.count(); } - } - - /** - * Physical schema of a row set showing the logical hierarchy of fields - * with map fields as first-class fields. Map members appear as children - * under the map, much as they appear in the physical value-vector - * implementation. - */ - - public static class PhysicalSchema { - protected final NameSpace<LogicalColumn> schema = new NameSpace<>(); - - public LogicalColumn column(int index) { - return schema.get(index); - } - - public LogicalColumn column(String name) { - return schema.get(name); - } - - public int count() { return schema.count(); } - - public NameSpace<LogicalColumn> nameSpace() { return schema; } - } - - private static class SchemaExpander { - private final PhysicalSchema physicalSchema; - private final NameSpace<LogicalColumn> cols = new NameSpace<>(); - private final NameSpace<LogicalColumn> maps = new NameSpace<>(); - - public SchemaExpander(BatchSchema schema) { - physicalSchema = expand("", schema); - } - - private PhysicalSchema expand(String prefix, Iterable<MaterializedField> fields) { - PhysicalSchema physical = new PhysicalSchema(); - for (MaterializedField field : fields) { - String name = prefix + field.getName(); - int index; - LogicalColumn colSchema = new LogicalColumn(name, physical.count(), field); - physical.schema.add(field.getName(), colSchema); - PhysicalSchema children = null; - if (field.getType().getMinorType() == MinorType.MAP) { - index = maps.add(name, colSchema); - children = expand(name + ".", field.getChildren()); - } else { - index = cols.add(name, colSchema); - } - colSchema.updateStructure(index, children); - } - return physical; - } - } - - private final BatchSchema batchSchema; - private final TupleSchemaImpl accessSchema; - private final FlattenedSchema flatSchema; - private final PhysicalSchema physicalSchema; - - public RowSetSchema(BatchSchema schema) { - batchSchema = schema; - SchemaExpander expander = new SchemaExpander(schema); - physicalSchema = expander.physicalSchema; - accessSchema = new TupleSchemaImpl(physicalSchema.nameSpace()); - flatSchema = new FlattenedSchema(expander.cols, expander.maps); - } - - /** - * A hierarchical schema that includes maps, with maps expanding - * to a nested tuple schema. Not used at present; this is intended - * to be the bases of non-flattened accessors if we find the need. - * @return the hierarchical access schema - */ - - public TupleSchema hierarchicalAccess() { return accessSchema; } - - /** - * A flattened (left-to-right, depth-first traversal) of the non-map - * columns in the row. Used to define the column indexes in the - * get methods for row readers and the set methods for row writers. - * @return the flattened access schema - */ - - public FlattenedSchema flatAccess() { return flatSchema; } - - /** - * Internal physical schema in hierarchical order. Mostly used to create - * the other schemas, but may be of use in special cases. Has the same - * structure as the batch schema, but with additional information. - * @return a tree-structured physical schema - */ - - public PhysicalSchema physical() { return physicalSchema; } - - /** - * The batch schema used by the Drill runtime. Represents a tree-structured - * list of top-level fields, including maps. Maps contain a nested schema. - * @return the batch schema used by the Drill runtime - */ - - public BatchSchema batch() { return batchSchema; } - - /** - * Convert this schema to a new batch schema that includes the specified - * selection vector mode. - * @param svMode selection vector mode for the new schema - * @return the new batch schema - */ - - public BatchSchema toBatchSchema(SelectionVectorMode svMode) { - List<MaterializedField> fields = new ArrayList<>(); - for (MaterializedField field : batchSchema) { - fields.add(field); - } - return new BatchSchema(svMode, fields); - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java index 261a9c1..32b61ca 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java @@ -17,12 +17,18 @@ */ package org.apache.drill.test.rowSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; + +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.vector.accessor.AccessorUtilities; -import org.apache.drill.exec.vector.accessor.ColumnAccessor.ValueType; -import org.apache.drill.exec.vector.accessor.ColumnWriter; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.ValueType; +import org.bouncycastle.util.Arrays; import org.joda.time.Duration; import org.joda.time.Period; @@ -62,11 +68,42 @@ public class RowSetUtilities { */ public static void setFromInt(RowSetWriter rowWriter, int index, int value) { - ColumnWriter writer = rowWriter.column(index); - if (writer.valueType() == ValueType.PERIOD) { - setPeriodFromInt(writer, rowWriter.schema().column(index).getType().getMinorType(), value); - } else { - AccessorUtilities.setFromInt(writer, value); + ScalarWriter writer = rowWriter.scalar(index); + MaterializedField field = rowWriter.schema().column(index); + writer.setObject(testDataFromInt(writer.valueType(), field.getType(), value)); + } + + public static Object testDataFromInt(ValueType valueType, MajorType dataType, int value) { + switch (valueType) { + case BYTES: + return Integer.toHexString(value).getBytes(); + case DOUBLE: + return (double) value; + case INTEGER: + switch (dataType.getMinorType()) { + case BIT: + return value & 0x01; + case SMALLINT: + return value % 32768; + case UINT2: + return value & 0xFFFF; + case TINYINT: + return value % 128; + case UINT1: + return value & 0xFF; + default: + return value; + } + case LONG: + return (long) value; + case STRING: + return Integer.toString(value); + case DECIMAL: + return BigDecimal.valueOf(value, dataType.getScale()); + case PERIOD: + return periodFromInt(dataType.getMinorType(), value); + default: + throw new IllegalStateException("Unknown writer type: " + valueType); } } @@ -81,26 +118,56 @@ public class RowSetUtilities { * @param writer column writer for a period column * @param minorType the Drill data type * @param value the integer value to apply + * @throws VectorOverflowException */ - public static void setPeriodFromInt(ColumnWriter writer, MinorType minorType, - int value) { + public static Period periodFromInt(MinorType minorType, int value) { switch (minorType) { case INTERVAL: - writer.setPeriod(Duration.millis(value).toPeriod()); - break; + return Duration.millis(value).toPeriod(); case INTERVALYEAR: - writer.setPeriod(Period.years(value / 12).withMonths(value % 12)); - break; + return Period.years(value / 12).withMonths(value % 12); case INTERVALDAY: int sec = value % 60; value = value / 60; int min = value % 60; value = value / 60; - writer.setPeriod(Period.days(value).withMinutes(min).withSeconds(sec)); - break; + return Period.days(value).withMinutes(min).withSeconds(sec); default: throw new IllegalArgumentException("Writer is not an interval: " + minorType); } } + + public static void assertEqualValues(ValueType type, Object expectedObj, Object actualObj) { + assertEqualValues(type.toString(), type, expectedObj, actualObj); + } + + public static void assertEqualValues(String msg, ValueType type, Object expectedObj, Object actualObj) { + switch (type) { + case BYTES: { + byte expected[] = (byte[]) expectedObj; + byte actual[] = (byte[]) actualObj; + assertEquals(msg + " - byte lengths differ", expected.length, actual.length); + assertTrue(msg, Arrays.areEqual(expected, actual)); + break; + } + case DOUBLE: + assertEquals(msg, (double) expectedObj, (double) actualObj, 0.0001); + break; + case INTEGER: + case LONG: + case STRING: + case DECIMAL: + assertEquals(msg, expectedObj, actualObj); + break; + case PERIOD: { + Period expected = (Period) expectedObj; + Period actual = (Period) actualObj; + assertEquals(msg, expected.normalizedStandard(), actual.normalizedStandard()); + break; + } + default: + throw new IllegalStateException( "Unexpected type: " + type); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java new file mode 100644 index 0000000..874c0e1 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test.rowSet; + +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; + +/** + * Interface for writing values to a row set. Only available + * for newly-created, single, direct row sets. Eventually, if + * we want to allow updating a row set, we have to create a + * new row set with the updated columns, then merge the new + * and old row sets to create a new immutable row set. + * <p> + * Typical usage: + * <pre></code> + * void writeABatch() { + * RowSetWriter writer = ... + * while (! writer.isFull()) { + * writer.scalar(0).setInt(10); + * writer.scalar(1).setString("foo"); + * ... + * writer.save(); + * } + * }</code></pre> + * The above writes until the batch is full, based on size. If values + * are large enough to potentially cause vector overflow, do the + * following instead: + * <pre></code> + * void writeABatch() { + * RowSetWriter writer = ... + * while (! writer.isFull()) { + * writer.column(0).setInt(10); + * try { + * writer.column(1).setString("foo"); + * } catch (VectorOverflowException e) { break; } + * ... + * writer.save(); + * } + * // Do something with the partially-written last row. + * }</code></pre> + * <p> + * This writer is for testing, so no provision is available to handle a + * partial last row. (Elsewhere n Drill there are classes that handle that case.) + */ + +public interface RowSetWriter extends TupleWriter { + + /** + * Write a row of values, given by Java objects. Object type must + * match expected column type. Stops writing, and returns false, + * if any value causes vector overflow. Value format: + * <ul> + * <li>For scalars, the value as a suitable Java type (int or + * Integer, say, for <tt>INTEGER</tt> values.)</li> + * <li>For scalar arrays, an array of a suitable Java primitive type + * for scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt> + * column.</li> + * <li>For a Map, an <tt>Object<tt> array with values encoded as above. + * (In fact, the list here is the same as the map format.</li> + * <li>For a list (repeated map, list of list), an <tt>Object</tt> + * array with values encoded as above. (So, for a repeated map, an outer + * <tt>Object</tt> map encodes the array, an inner one encodes the + * map members.</li> + * </ul> + * + * @param values variable-length argument list of column values + */ + + void setRow(Object...values); + + /** + * Indicates if the current row position is valid for + * writing. Will be false on the first row, and all subsequent + * rows until either the maximum number of rows are written, + * or a vector overflows. After that, will return true. The + * method returns false as soon as any column writer overflows + * even in the middle of a row write. That is, this writer + * does not automatically handle overflow rows because that + * added complexity is seldom needed for tests. + * + * @return true if the current row can be written, false + * if not + */ + + boolean isFull(); + int rowIndex(); + + /** + * Saves the current row and moves to the next row. + * Done automatically if using <tt>setRow()</tt>. + */ + + void save(); + + /** + * Finish writing and finalize the row set being + * written. + * @return the completed, read-only row set without a + * selection vector + */ + + SingleRowSet done(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java new file mode 100644 index 0000000..074842d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test.rowSet; + +import java.util.List; + +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter; +import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; + +/** + * Implementation of a row set writer. Only available for newly-created, + * empty, direct, single row sets. Rewriting is not allowed, nor is writing + * to a hyper row set. + */ + +public class RowSetWriterImpl extends AbstractTupleWriter implements RowSetWriter { + + /** + * Writer index that points to each row in the row set. The index starts at + * the 0th row and advances one row on each increment. This allows writers to + * start positioned at the first row. Writes happen in the current row. + * Calling <tt>next()</tt> advances to the next position, effectively saving + * the current row. The most recent row can be abandoned easily simply by not + * calling <tt>next()</tt>. This means that the number of completed rows is + * the same as the row index. + */ + + static class WriterIndexImpl implements ColumnWriterIndex { + + public enum State { OK, VECTOR_OVERFLOW, END_OF_BATCH } + + private int rowIndex = 0; + private State state = State.OK; + + @Override + public final int vectorIndex() { return rowIndex; } + + public final boolean next() { + if (++rowIndex < ValueVector.MAX_ROW_COUNT) { + return true; + } + // Should not call next() again once batch is full. + assert rowIndex == ValueVector.MAX_ROW_COUNT; + rowIndex = ValueVector.MAX_ROW_COUNT; + state = state == State.OK ? State.END_OF_BATCH : state; + return false; + } + + public int size() { + // The index always points to the next slot past the + // end of valid rows. + return rowIndex; + } + + public boolean valid() { return state == State.OK; } + + public boolean hasOverflow() { return state == State.VECTOR_OVERFLOW; } + + @Override + public final void nextElement() { } + + @Override + public void rollover() { + throw new UnsupportedOperationException("Rollover not supported in the row set writer."); + } + + @Override + public int rowStartIndex() { return rowIndex; } + + @Override + public ColumnWriterIndex outerIndex() { return null; } + + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" state = ") + .append(state) + .append(", rowIndex = ") + .append(rowIndex) + .append("]") + .toString(); + } + } + + private final WriterIndexImpl writerIndex; + private final ExtendableRowSet rowSet; + + protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleMetadata schema, WriterIndexImpl index, List<AbstractObjectWriter> writers) { + super(schema, writers); + this.rowSet = rowSet; + this.writerIndex = index; + bindIndex(index); + startWrite(); + startRow(); + } + + @Override + public void setRow(Object...values) { + setObject(values); + save(); + } + + @Override + public int rowIndex() { return writerIndex.vectorIndex(); } + + @Override + public void save() { + endArrayValue(); + saveRow(); + + // For convenience, start a new row after each save. + // The last (unused) row is abandoned when the batch is full. + + if (writerIndex.next()) { + startRow(); + } + } + + @Override + public boolean isFull( ) { return ! writerIndex.valid(); } + + @Override + public SingleRowSet done() { + endWrite(); + rowSet.container().setRecordCount(writerIndex.vectorIndex()); + return rowSet; + } + + @Override + public int lastWriteIndex() { + return writerIndex.vectorIndex(); + } +}