http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java new file mode 100644 index 0000000..27a88f0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +/** + * Defines the schema of a tuple: either the top-level row or a nested + * "map" (really structure). A schema is a collection of columns (backed + * by vectors in the loader itself.) Columns are accessible by name or + * index. New columns may be added at any time; the new column takes the + * next available index. + */ + +public class TupleSchema implements TupleMetadata { + + /** + * Abstract definition of column metadata. Allows applications to create + * specialized forms of a column metadata object by extending from this + * abstract class. + * <p> + * Note that, by design, primitive columns do not have a link to their + * tuple parent, or their index within that parent. This allows the same + * metadata to be shared between two views of a tuple, perhaps physical + * and projected views. This restriction does not apply to map columns, + * since maps (and the row itself) will, by definition, differ between + * the two views. + */ + + public static abstract class AbstractColumnMetadata implements ColumnMetadata { + + protected MaterializedField schema; + protected boolean projected = true; + + /** + * Predicted number of elements per array entry. Default is + * taken from the often hard-coded value of 10. + */ + + protected int expectedElementCount = 1; + + public AbstractColumnMetadata(MaterializedField schema) { + this.schema = schema; + if (isArray()) { + expectedElementCount = DEFAULT_ARRAY_SIZE; + } + } + + public AbstractColumnMetadata(AbstractColumnMetadata from) { + schema = from.schema; + expectedElementCount = from.expectedElementCount; + } + + protected void bind(TupleSchema parentTuple) { } + + @Override + public MaterializedField schema() { return schema; } + + public void replaceField(MaterializedField field) { + this.schema = field; + } + @Override + public String name() { return schema().getName(); } + + @Override + public MajorType majorType() { return schema().getType(); } + + @Override + public MinorType type() { return schema().getType().getMinorType(); } + + @Override + public DataMode mode() { return schema().getDataMode(); } + + @Override + public boolean isNullable() { return mode() == DataMode.OPTIONAL; } + + @Override + public boolean isArray() { return mode() == DataMode.REPEATED; } + + @Override + public boolean isList() { return false; } + + @Override + public boolean isVariableWidth() { + MinorType type = type(); + return type == MinorType.VARCHAR || type == MinorType.VAR16CHAR || type == MinorType.VARBINARY; + } + + @Override + public boolean isEquivalent(ColumnMetadata other) { + return schema().isEquivalent(other.schema()); + } + + @Override + public int expectedWidth() { return 0; } + + @Override + public void setExpectedWidth(int width) { } + + @Override + public void setExpectedElementCount(int childCount) { + // The allocation utilities don't like an array size of zero, so set to + // 1 as the minimum. Adjusted to avoid trivial errors if the caller + // makes an error. + + if (isArray()) { + expectedElementCount = Math.max(1, childCount); + } + } + + @Override + public int expectedElementCount() { return expectedElementCount; } + + @Override + public void setProjected(boolean projected) { + this.projected = projected; + } + + @Override + public boolean isProjected() { return projected; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" ") + .append(schema().toString()) + .append(",") + .append(projected ? "" : "not ") + .append("projected"); + if (isArray()) { + buf.append(", cardinality: ") + .append(expectedElementCount); + } + return buf + .append("]") + .toString(); + } + + public abstract AbstractColumnMetadata copy(); + } + + /** + * Primitive (non-map) column. Describes non-nullable, nullable and + * array types (which differ only in mode, but not in metadata structure.) + */ + + public static class PrimitiveColumnMetadata extends AbstractColumnMetadata { + + protected int expectedWidth; + + public PrimitiveColumnMetadata(MaterializedField schema) { + super(schema); + expectedWidth = TypeHelper.getSize(majorType()); + if (isVariableWidth()) { + + // The above getSize() method uses the deprecated getWidth() + // method to get the expected VarChar size. If zero (which + // it will be), try the revised precision field. + + int precision = majorType().getPrecision(); + if (precision > 0) { + expectedWidth = precision; + } else { + // TypeHelper includes the offset vector width + + expectedWidth = expectedWidth - 4; + } + } + } + + public PrimitiveColumnMetadata(PrimitiveColumnMetadata from) { + super(from); + expectedWidth = from.expectedWidth; + } + + @Override + public AbstractColumnMetadata copy() { + return new PrimitiveColumnMetadata(this); + } + + @Override + public ColumnMetadata.StructureType structureType() { return ColumnMetadata.StructureType.PRIMITIVE; } + + @Override + public TupleMetadata mapSchema() { return null; } + + @Override + public boolean isMap() { return false; } + + @Override + public int expectedWidth() { return expectedWidth; } + + @Override + public void setExpectedWidth(int width) { + // The allocation utilities don't like a width of zero, so set to + // 1 as the minimum. Adjusted to avoid trivial errors if the caller + // makes an error. + + if (isVariableWidth()) { + expectedWidth = Math.max(1, width); + } + } + + @Override + public ColumnMetadata cloneEmpty() { + return new PrimitiveColumnMetadata(this); + } + } + + /** + * Describes a map and repeated map. Both are tuples that have a tuple + * schema as part of the column definition. + */ + + public static class MapColumnMetadata extends AbstractColumnMetadata { + private TupleMetadata parentTuple; + private final TupleSchema mapSchema; + + /** + * Build a new map column from the field provided + * + * @param schema materialized field description of the map + */ + + public MapColumnMetadata(MaterializedField schema) { + this(schema, null); + } + + /** + * Build a map column metadata by cloning the type information (but not + * the children) of the materialized field provided. Use the hints + * provided. + * + * @param schema the schema to use + * @param hints metadata hints for this column + */ + + private MapColumnMetadata(MaterializedField schema, TupleSchema mapSchema) { + super(schema); + if (mapSchema == null) { + this.mapSchema = new TupleSchema(); + } else { + this.mapSchema = mapSchema; + } + this.mapSchema.bind(this); + } + + @Override + public AbstractColumnMetadata copy() { + return new MapColumnMetadata(schema, (TupleSchema) mapSchema.copy()); + } + + @Override + protected void bind(TupleSchema parentTuple) { + this.parentTuple = parentTuple; + } + + @Override + public ColumnMetadata.StructureType structureType() { return ColumnMetadata.StructureType.TUPLE; } + + @Override + public TupleMetadata mapSchema() { return mapSchema; } + + @Override + public int expectedWidth() { return 0; } + + @Override + public boolean isMap() { return true; } + + public TupleMetadata parentTuple() { return parentTuple; } + + public TupleSchema mapSchemaImpl() { return mapSchema; } + + @Override + public ColumnMetadata cloneEmpty() { + return new MapColumnMetadata(schema().cloneEmpty(), null); + } + } + + private MapColumnMetadata parentMap; + private final TupleNameSpace<ColumnMetadata> nameSpace = new TupleNameSpace<>(); + + public void bind(MapColumnMetadata parentMap) { + this.parentMap = parentMap; + } + + public static TupleSchema fromFields(Iterable<MaterializedField> fields) { + TupleSchema tuple = new TupleSchema(); + for (MaterializedField field : fields) { + tuple.add(field); + } + return tuple; + } + + public TupleMetadata copy() { + TupleMetadata tuple = new TupleSchema(); + for (ColumnMetadata md : this) { + tuple.addColumn(((AbstractColumnMetadata) md).copy()); + } + return tuple; + } + + /** + * Create a column metadata object that holds the given + * {@link MaterializedField}. The type of the object will be either a + * primitive or map column, depending on the field's type. + * + * @param field the materialized field to wrap + * @return the column metadata that wraps the field + */ + + public static AbstractColumnMetadata fromField(MaterializedField field) { + if (field.getType().getMinorType() == MinorType.MAP) { + return newMap(field); + } else { + return new PrimitiveColumnMetadata(field); + } + } + + public static AbstractColumnMetadata fromView(MaterializedField field) { + if (field.getType().getMinorType() == MinorType.MAP) { + return new MapColumnMetadata(field, null); + } else { + return new PrimitiveColumnMetadata(field); + } + } + + /** + * Create a tuple given the list of columns that make up the tuple. + * Creates nested maps as needed. + * + * @param columns list of columns that make up the tuple + * @return a tuple metadata object that contains the columns + */ + + public static TupleSchema fromColumns(List<ColumnMetadata> columns) { + TupleSchema tuple = new TupleSchema(); + for (ColumnMetadata column : columns) { + tuple.add((AbstractColumnMetadata) column); + } + return tuple; + } + + /** + * Create a column metadata object for a map column, given the + * {@link MaterializedField} that describes the column, and a list + * of column metadata objects that describe the columns in the map. + * + * @param field the materialized field that describes the map column + * @param schema metadata that describes the tuple of columns in + * the map + * @return a map column metadata for the map + */ + + public static MapColumnMetadata newMap(MaterializedField field, TupleSchema schema) { + return new MapColumnMetadata(field, schema); + } + + public static MapColumnMetadata newMap(MaterializedField field) { + return new MapColumnMetadata(field, fromFields(field.getChildren())); + } + + @Override + public ColumnMetadata add(MaterializedField field) { + AbstractColumnMetadata md = fromField(field); + add(md); + return md; + } + + public ColumnMetadata addView(MaterializedField field) { + AbstractColumnMetadata md = fromView(field); + add(md); + return md; + } + + /** + * Add a column metadata column created by the caller. Used for specialized + * cases beyond those handled by {@link #add(MaterializedField)}. + * + * @param md the custom column metadata which must have the correct + * index set (from {@link #size()} + */ + + public void add(AbstractColumnMetadata md) { + md.bind(this); + nameSpace.add(md.name(), md); + if (parentMap != null) { + parentMap.schema.addChild(md.schema()); + } + } + + @Override + public int addColumn(ColumnMetadata column) { + add((AbstractColumnMetadata) column); + return size() - 1; + } + + @Override + public MaterializedField column(String name) { + ColumnMetadata md = metadata(name); + return md == null ? null : md.schema(); + } + + @Override + public ColumnMetadata metadata(String name) { + return nameSpace.get(name); + } + + @Override + public int index(String name) { + return nameSpace.indexOf(name); + } + + @Override + public MaterializedField column(int index) { + return metadata(index).schema(); + } + + @Override + public ColumnMetadata metadata(int index) { + return nameSpace.get(index); + } + + @Override + public MapColumnMetadata parent() { return parentMap; } + + @Override + public int size() { return nameSpace.count(); } + + @Override + public boolean isEmpty() { return nameSpace.count( ) == 0; } + + @Override + public Iterator<ColumnMetadata> iterator() { + return nameSpace.iterator(); + } + + @Override + public boolean isEquivalent(TupleMetadata other) { + TupleSchema otherSchema = (TupleSchema) other; + if (nameSpace.count() != otherSchema.nameSpace.count()) { + return false; + } + for (int i = 0; i < nameSpace.count(); i++) { + if (! nameSpace.get(i).isEquivalent(otherSchema.nameSpace.get(i))) { + return false; + } + } + return true; + } + + @Override + public List<MaterializedField> toFieldList() { + List<MaterializedField> cols = new ArrayList<>(); + for (ColumnMetadata md : nameSpace) { + cols.add(md.schema()); + } + return cols; + } + + public BatchSchema toBatchSchema(SelectionVectorMode svMode) { + return new BatchSchema(svMode, toFieldList()); + } + + @Override + public String fullName(int index) { + return fullName(metadata(index)); + } + + @Override + public String fullName(ColumnMetadata column) { + String quotedName = column.name(); + if (quotedName.contains(".")) { + quotedName = "`" + quotedName + "`"; + } + if (isRoot()) { + return column.name(); + } else { + return fullName() + "." + quotedName; + } + } + + public String fullName() { + if (isRoot()) { + return "<root>"; + } else { + return parentMap.parentTuple().fullName(parentMap); + } + } + + public boolean isRoot() { return parentMap == null; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" "); + boolean first = true; + for (ColumnMetadata md : nameSpace) { + if (! first) { + buf.append(", "); + } + buf.append(md.toString()); + } + buf.append("]"); + return buf.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index a38a7fe..42f3473 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -24,10 +24,16 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.record.DeadBuf; /** - * A selection vector that fronts, at most, a + * A selection vector that fronts, at most, 64K values. + * The selection vector is used for two cases: + * <ol> + * <li>To create a list of values retained by a filter.</li> + * <li>To provide a redirection level for sorted + * batches.</li> + * </ol> */ + public class SelectionVector2 implements AutoCloseable { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class); private final BufferAllocator allocator; private int recordCount; @@ -39,9 +45,19 @@ public class SelectionVector2 implements AutoCloseable { this.allocator = allocator; } + /** + * Create a selection vector with the given buffer. The selection vector + * increments the buffer's reference count, talking ownership of the buffer. + * + * @param allocator allocator used to allocate the buffer + * @param buf the buffer containing the selection vector's data + * @param count the number of values in the selection vector + */ + public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) { this.allocator = allocator; buffer = buf; + buffer.retain(1); recordCount = count; } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java index cfb8645..a283924 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java @@ -27,15 +27,16 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.cache.VectorSerializer.Reader; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.DrillTest; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; import org.apache.drill.test.rowSet.RowSetUtilities; +import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.SchemaBuilder; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -73,7 +74,7 @@ public class TestBatchSerialization extends DrillTest { if (i % 2 == 0) { RowSetUtilities.setFromInt(writer, 0, i); } else { - writer.column(0).setNull(); + writer.scalar(0).setNull(); } writer.save(); } @@ -125,9 +126,8 @@ public class TestBatchSerialization extends DrillTest { RowSet result; try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) { - result = fixture.wrap( - VectorSerializer.reader(fixture.allocator(), in) - .read()); + Reader reader = VectorSerializer.reader(fixture.allocator(), in); + result = fixture.wrap(reader.read(), reader.sv2()); } new RowSetComparison(expected) @@ -163,17 +163,17 @@ public class TestBatchSerialization extends DrillTest { private SingleRowSet buildMapSet(BatchSchema schema) { return fixture.rowSetBuilder(schema) - .add(1, 100, "first") - .add(2, 200, "second") - .add(3, 300, "third") + .addRow(1, new Object[] {100, "first"}) + .addRow(2, new Object[] {200, "second"}) + .addRow(3, new Object[] {300, "third"}) .build(); } private SingleRowSet buildArraySet(BatchSchema schema) { return fixture.rowSetBuilder(schema) - .add(1, new String[] { "first, second, third" } ) - .add(2, null) - .add(3, new String[] { "third, fourth, fifth" } ) + .addRow(1, new String[] { "first, second, third" } ) + .addRow(2, null) + .addRow(3, new String[] { "third, fourth, fifth" } ) .build(); } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java index fa6e318..e7d0a97 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java @@ -80,16 +80,16 @@ public class TopNBatchTest extends PopUnitTestBase { try (RootAllocator allocator = new RootAllocator(100_000_000)) { expectedRowSet = new RowSetBuilder(allocator, batchSchema) - .add(110, 10) - .add(109, 9) - .add(108, 8) - .add(107, 7) - .add(106, 6) - .add(105, 5) - .add(104, 4) - .add(103, 3) - .add(102, 2) - .add(101, 1) + .addRow(110, 10) + .addRow(109, 9) + .addRow(108, 8) + .addRow(107, 7) + .addRow(106, 6) + .addRow(105, 5) + .addRow(104, 4) + .addRow(103, 3) + .addRow(102, 2) + .addRow(101, 1) .build(); PriorityQueue queue; @@ -121,10 +121,10 @@ public class TopNBatchTest extends PopUnitTestBase { for (int batchCounter = 0; batchCounter < numBatches; batchCounter++) { RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, batchSchema); - rowSetBuilder.add((batchCounter + bound), batchCounter); + rowSetBuilder.addRow((batchCounter + bound), batchCounter); for (int recordCounter = 0; recordCounter < numRecordsPerBatch; recordCounter++) { - rowSetBuilder.add(random.nextInt(bound), random.nextInt(bound)); + rowSetBuilder.addRow(random.nextInt(bound), random.nextInt(bound)); } VectorContainer vectorContainer = rowSetBuilder.build().container(); @@ -135,7 +135,7 @@ public class TopNBatchTest extends PopUnitTestBase { VectorContainer resultContainer = queue.getHyperBatch(); resultContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); - RowSet.HyperRowSet actualHyperSet = new HyperRowSetImpl(allocator, resultContainer, queue.getFinalSv4()); + RowSet.HyperRowSet actualHyperSet = new HyperRowSetImpl(resultContainer, queue.getFinalSv4()); new RowSetComparison(expectedRowSet).verify(actualHyperSet); } finally { if (expectedRowSet != null) { http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java index eafb4c8..202a0f1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java @@ -68,10 +68,10 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add(10, 100) - .add(20, 120) - .add(30, null) - .add(40, 140) + .addRow(10, 100) + .addRow(20, 120) + .addRow(30, null) + .addRow(40, 140) .build(); BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true); @@ -88,10 +88,10 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("col1.1", "col1.2") - .add("col2.1", "col2.2") - .add("col3.1", null) - .add("col4.1", "col4.2") + .addRow("col1.1", "col1.2") + .addRow("col2.1", "col2.2") + .addRow("col3.1", null) + .addRow("col4.1", "col4.2") .build(); BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true); @@ -108,9 +108,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add(new int[] {}, new String[] {}) - .add(new int[] {1, 2, 3}, new String[] {"fred", "barney", "wilma"}) - .add(new int[] {4}, new String[] {"dino"}) + .addRow(new int[] {}, new String[] {}) + .addRow(new int[] {1, 2, 3}, new String[] {"fred", "barney", "wilma"}) + .addRow(new int[] {4}, new String[] {"dino"}) .build(); BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true); @@ -126,9 +126,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("x") - .add("y") - .add("z") + .addRow("x") + .addRow("y") + .addRow("z") .build(); // Here we are evil: stomp on the last offset to simulate corruption. @@ -160,9 +160,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("x") - .add("y") - .add("z") + .addRow("x") + .addRow("y") + .addRow("z") .build(); zapOffset(batch, 0, 1); @@ -198,9 +198,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("xx") - .add("yy") - .add("zz") + .addRow("xx") + .addRow("yy") + .addRow("zz") .build(); zapOffset(batch, 2, 1); @@ -222,9 +222,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("xx") - .add("yy") - .add("zz") + .addRow("xx") + .addRow("yy") + .addRow("zz") .build(); zapOffset(batch, 1, 10); @@ -246,9 +246,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add("xx") - .add("yy") - .add("zz") + .addRow("xx") + .addRow("yy") + .addRow("zz") .build(); zapOffset(batch, 3, 100_000); @@ -270,9 +270,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add((Object) new String[] {}) - .add((Object) new String[] {"fred", "barney", "wilma"}) - .add((Object) new String[] {"dino"}) + .addRow((Object) new String[] {}) + .addRow((Object) new String[] {"fred", "barney", "wilma"}) + .addRow((Object) new String[] {"dino"}) .build(); VectorAccessible va = batch.vectorAccessible(); @@ -298,9 +298,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ { .build(); SingleRowSet batch = fixture.rowSetBuilder(schema) - .add((Object) new String[] {}) - .add((Object) new String[] {"fred", "barney", "wilma"}) - .add((Object) new String[] {"dino"}) + .addRow((Object) new String[] {}) + .addRow((Object) new String[] {"fred", "barney", "wilma"}) + .addRow((Object) new String[] {"dino"}) .build(); VectorAccessible va = batch.vectorAccessible(); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java index c52f1a9..563d97e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java @@ -60,7 +60,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 0; i <= record_count; i += 2) { - rowSetBuilder.add(i); + rowSetBuilder.addRow(i); } final RowSet rowSet = rowSetBuilder.build(); @@ -76,7 +76,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 1; i <= record_count; i += 2) { - rowSetBuilder.add((float) i); + rowSetBuilder.addRow((float) i); } final RowSet rowSet = rowSetBuilder.build(); @@ -131,7 +131,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 0; i <= record_count; i += 2) { - rowSetBuilder.add(i); + rowSetBuilder.addRow(i); } final RowSet rowSet = rowSetBuilder.build(); @@ -147,7 +147,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 1; i <= record_count; i += 2) { - rowSetBuilder.add(i); + rowSetBuilder.addRow(i); } final RowSet rowSet = rowSetBuilder.build(); @@ -199,7 +199,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 0; i <= record_count; i += 2) { - rowSetBuilder.add(i, i); + rowSetBuilder.addRow(i, i); } final RowSet rowSet = rowSetBuilder.build(); @@ -216,7 +216,7 @@ public class TestExternalSort extends BaseTestQuery { final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema); for (int i = 1; i <= record_count; i += 2) { - rowSetBuilder.add(i, i); + rowSetBuilder.addRow(i, i); } final RowSet rowSet = rowSetBuilder.build(); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java index c58abd6..cd408cb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java @@ -33,12 +33,12 @@ import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrap import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.TupleMetadata; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.DirectRowSet; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; -import org.apache.drill.test.rowSet.RowSetSchema; import org.apache.drill.test.rowSet.SchemaBuilder; import com.google.common.collect.Lists; @@ -93,7 +93,7 @@ public class SortTestUtilities { public void run() throws Exception { PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder); List<BatchGroup> batches = new ArrayList<>(); - RowSetSchema schema = null; + TupleMetadata schema = null; for (SingleRowSet rowSet : rowSets) { batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(), fixture.allocator(), rowSet.size())); @@ -103,7 +103,7 @@ public class SortTestUtilities { } int rowCount = outputRowCount(); VectorContainer dest = new VectorContainer(); - BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE), + BatchMerger merger = copier.startMerge(new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()), batches, dest, rowCount, null); verifyResults(merger, dest); @@ -121,7 +121,7 @@ public class SortTestUtilities { protected void verifyResults(BatchMerger merger, VectorContainer dest) { for (RowSet expectedSet : expected) { assertTrue(merger.next()); - RowSet rowSet = new DirectRowSet(fixture.allocator(), dest); + RowSet rowSet = DirectRowSet.fromContainer(dest); new RowSetComparison(expectedSet) .verifyAndClearAll(rowSet); } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java index f1c622f..5d438ee 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java @@ -29,15 +29,13 @@ import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrap import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.test.DrillTest; import org.apache.drill.test.OperatorFixture; +import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetUtilities; +import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.SchemaBuilder; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,19 +49,7 @@ import org.junit.experimental.categories.Category; */ @Category(OperatorTest.class) -public class TestCopier extends DrillTest { - - public static OperatorFixture fixture; - - @BeforeClass - public static void setup() { - fixture = OperatorFixture.builder().build(); - } - - @AfterClass - public static void tearDown() throws Exception { - fixture.close(); - } +public class TestCopier extends SubOperatorTest { @Test public void testEmptyInput() throws Exception { @@ -101,12 +87,12 @@ public class TestCopier extends DrillTest { BatchSchema schema = SortTestUtilities.nonNullSchema(); CopierTester tester = new CopierTester(fixture); tester.addInput(fixture.rowSetBuilder(schema) - .add(10, "10") + .addRow(10, "10") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(10, "10") + .addRow(10, "10") .build()); tester.run(); } @@ -116,17 +102,17 @@ public class TestCopier extends DrillTest { BatchSchema schema = SortTestUtilities.nonNullSchema(); CopierTester tester = new CopierTester(fixture); tester.addInput(fixture.rowSetBuilder(schema) - .add(10, "10") + .addRow(10, "10") .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(20, "20") + .addRow(20, "20") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(10, "10") - .add(20, "20") + .addRow(10, "10") + .addRow(20, "20") .build()); tester.run(); } @@ -137,7 +123,7 @@ public class TestCopier extends DrillTest { int value = first; for (int i = 0; i < count; i++, value += step) { RowSetUtilities.setFromInt(writer, 0, value); - writer.column(1).setString(Integer.toString(value)); + writer.scalar(1).setString(Integer.toString(value)); writer.save(); } writer.done(); @@ -188,25 +174,25 @@ public class TestCopier extends DrillTest { tester.sortOrder = Ordering.ORDER_ASC; tester.nullOrder = Ordering.NULLS_LAST; tester.addInput(fixture.rowSetBuilder(schema) - .add(1, "1") - .add(4, "4") - .add(null, "null") + .addRow(1, "1") + .addRow(4, "4") + .addRow(null, "null") .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(2, "2") - .add(3, "3") - .add(null, "null") + .addRow(2, "2") + .addRow(3, "3") + .addRow(null, "null") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(1, "1") - .add(2, "2") - .add(3, "3") - .add(4, "4") - .add(null, "null") - .add(null, "null") + .addRow(1, "1") + .addRow(2, "2") + .addRow(3, "3") + .addRow(4, "4") + .addRow(null, "null") + .addRow(null, "null") .build()); tester.run(); @@ -220,25 +206,25 @@ public class TestCopier extends DrillTest { tester.sortOrder = Ordering.ORDER_ASC; tester.nullOrder = Ordering.NULLS_FIRST; tester.addInput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(1, "1") - .add(4, "4") + .addRow(null, "null") + .addRow(1, "1") + .addRow(4, "4") .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(2, "2") - .add(3, "3") + .addRow(null, "null") + .addRow(2, "2") + .addRow(3, "3") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(null, "null") - .add(1, "1") - .add(2, "2") - .add(3, "3") - .add(4, "4") + .addRow(null, "null") + .addRow(null, "null") + .addRow(1, "1") + .addRow(2, "2") + .addRow(3, "3") + .addRow(4, "4") .build()); tester.run(); @@ -252,25 +238,25 @@ public class TestCopier extends DrillTest { tester.sortOrder = Ordering.ORDER_DESC; tester.nullOrder = Ordering.NULLS_LAST; tester.addInput(fixture.rowSetBuilder(schema) - .add(4, "4") - .add(1, "1") - .add(null, "null") + .addRow(4, "4") + .addRow(1, "1") + .addRow(null, "null") .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(3, "3") - .add(2, "2") - .add(null, "null") + .addRow(3, "3") + .addRow(2, "2") + .addRow(null, "null") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(4, "4") - .add(3, "3") - .add(2, "2") - .add(1, "1") - .add(null, "null") - .add(null, "null") + .addRow(4, "4") + .addRow(3, "3") + .addRow(2, "2") + .addRow(1, "1") + .addRow(null, "null") + .addRow(null, "null") .build()); tester.run(); @@ -284,25 +270,25 @@ public class TestCopier extends DrillTest { tester.sortOrder = Ordering.ORDER_DESC; tester.nullOrder = Ordering.NULLS_FIRST; tester.addInput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(4, "4") - .add(1, "1") + .addRow(null, "null") + .addRow(4, "4") + .addRow(1, "1") .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(3, "3") - .add(2, "2") + .addRow(null, "null") + .addRow(3, "3") + .addRow(2, "2") .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(null, "null") - .add(null, "null") - .add(4, "4") - .add(3, "3") - .add(2, "2") - .add(1, "1") + .addRow(null, "null") + .addRow(null, "null") + .addRow(4, "4") + .addRow(3, "3") + .addRow(2, "2") + .addRow(1, "1") .build()); tester.run(); @@ -362,22 +348,22 @@ public class TestCopier extends DrillTest { CopierTester tester = new CopierTester(fixture); tester.addInput(fixture.rowSetBuilder(schema) - .add(1, 10, 100) - .add(5, 50, 500) + .addRow(1, new Object[] {10, new Object[] {100}}) + .addRow(5, new Object[] {50, new Object[] {500}}) .withSv2() .build()); tester.addInput(fixture.rowSetBuilder(schema) - .add(2, 20, 200) - .add(6, 60, 600) + .addRow(2, new Object[] {20, new Object[] {200}}) + .addRow(6, new Object[] {60, new Object[] {600}}) .withSv2() .build()); tester.addOutput(fixture.rowSetBuilder(schema) - .add(1, 10, 100) - .add(2, 20, 200) - .add(5, 50, 500) - .add(6, 60, 600) + .addRow(1, new Object[] {10, new Object[] {100}}) + .addRow(2, new Object[] {20, new Object[] {200}}) + .addRow(5, new Object[] {50, new Object[] {500}}) + .addRow(6, new Object[] {60, new Object[] {600}}) .build()); tester.run(); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java index ba5dfce..38e3698 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java @@ -61,9 +61,9 @@ public class TestShortArrays extends SubOperatorTest { .addArray("b", MinorType.INT) .build(); RowSetBuilder builder = fixture.rowSetBuilder(schema) - .add(1, new int[] {10}); + .addRow(1, new int[] {10}); for (int i = 2; i <= 10; i++) { - builder.add(i, new int[] {}); + builder.addRow(i, new int[] {}); } RowSet rows = builder.build(); @@ -87,9 +87,9 @@ public class TestShortArrays extends SubOperatorTest { SingleRowSet empty = fixture.rowSet(schema); vi.allocateBatch(empty.container(), 100); - assertEquals(2, empty.vectors().length); + assertEquals(2, empty.container().getNumberOfColumns()); @SuppressWarnings("resource") - ValueVector bVector = empty.vectors()[1]; + ValueVector bVector = empty.container().getValueVector(1).getValueVector(); assertTrue(bVector instanceof RepeatedIntVector); assertEquals(16, ((RepeatedIntVector) bVector).getDataVector().getValueCapacity()); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java index d83a765..93411d7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java @@ -46,8 +46,8 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl; import org.apache.drill.test.rowSet.IndirectRowSet; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; -import org.apache.drill.test.rowSet.RowSet.RowSetReader; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetComparison; import org.apache.drill.test.rowSet.SchemaBuilder; @@ -193,11 +193,11 @@ public class TestSortImpl extends DrillTest { private static RowSet toRowSet(OperatorFixture fixture, SortResults results, VectorContainer dest) { if (results.getSv4() != null) { - return new HyperRowSetImpl(fixture.allocator(), dest, results.getSv4()); + return new HyperRowSetImpl(dest, results.getSv4()); } else if (results.getSv2() != null) { - return new IndirectRowSet(fixture.allocator(), dest, results.getSv2()); + return IndirectRowSet.fromSv2(dest, results.getSv2()); } else { - return new DirectRowSet(fixture.allocator(), dest); + return DirectRowSet.fromContainer(dest); } } @@ -242,10 +242,10 @@ public class TestSortImpl extends DrillTest { BatchSchema schema = SortTestUtilities.nonNullSchema(); SortTestFixture sortTest = new SortTestFixture(fixture); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(1, "first") + .addRow(1, "first") .build()); sortTest.addOutput(fixture.rowSetBuilder(schema) - .add(1, "first") + .addRow(1, "first") .build()); sortTest.run(); } @@ -262,12 +262,12 @@ public class TestSortImpl extends DrillTest { BatchSchema schema = SortTestUtilities.nonNullSchema(); SortTestFixture sortTest = new SortTestFixture(fixture); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(2, "second") - .add(1, "first") + .addRow(2, "second") + .addRow(1, "first") .build()); sortTest.addOutput(fixture.rowSetBuilder(schema) - .add(1, "first") - .add(2, "second") + .addRow(1, "first") + .addRow(2, "second") .build()); sortTest.run(); } @@ -285,14 +285,14 @@ public class TestSortImpl extends DrillTest { BatchSchema schema = SortTestUtilities.nonNullSchema(); SortTestFixture sortTest = new SortTestFixture(fixture); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(2, "second") + .addRow(2, "second") .build()); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(1, "first") + .addRow(1, "first") .build()); sortTest.addOutput(fixture.rowSetBuilder(schema) - .add(1, "first") - .add(2, "second") + .addRow(1, "first") + .addRow(2, "second") .build()); sortTest.run(); } @@ -356,7 +356,7 @@ public class TestSortImpl extends DrillTest { RowSetBuilder builder = fixture.rowSetBuilder(schema); int end = Math.min(batchSize, targetCount - rowCount); for (int i = 0; i < end; i++) { - builder.add(currentValue, i + ", " + currentValue); + builder.addRow(currentValue, i + ", " + currentValue); currentValue = (currentValue + step) % targetCount; rowCount++; } @@ -387,7 +387,7 @@ public class TestSortImpl extends DrillTest { RowSetReader reader = output.reader(); while (reader.next()) { assertEquals("Value of " + batchCount + ":" + rowCount, - rowCount, reader.column(0).getInt()); + rowCount, reader.scalar(0).getInt()); rowCount++; } } @@ -593,18 +593,18 @@ public class TestSortImpl extends DrillTest { } }; sortTest.addInput(fixture.rowSetBuilder(schema) - .add(2, "second") + .addRow(2, "second") .build()); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(3, "third") + .addRow(3, "third") .build()); sortTest.addInput(fixture.rowSetBuilder(schema) - .add(1, "first") + .addRow(1, "first") .build()); sortTest.addOutput(fixture.rowSetBuilder(schema) - .add(1, "first") - .add(2, "second") - .add(3, "third") + .addRow(1, "first") + .addRow(2, "second") + .addRow(3, "third") .build()); sortTest.run(); } http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java index 5f04da6..c24f1a6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java @@ -36,8 +36,8 @@ import org.apache.drill.test.DrillTest; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; -import org.apache.drill.test.rowSet.RowSet.RowSetReader; -import org.apache.drill.test.rowSet.RowSet.RowSetWriter; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetWriter; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetComparison; @@ -111,12 +111,12 @@ public class TestSorter extends DrillTest { public void testSingleRow() throws Exception { BatchSchema schema = SortTestUtilities.nonNullSchema(); SingleRowSet rowSet = new RowSetBuilder(fixture.allocator(), schema) - .add(0, "0") + .addRow(0, "0") .withSv2() .build(); SingleRowSet expected = new RowSetBuilder(fixture.allocator(), schema) - .add(0, "0") + .addRow(0, "0") .build(); runSorterTest(rowSet, expected); } @@ -127,14 +127,14 @@ public class TestSorter extends DrillTest { public void testTwoRows() throws Exception { BatchSchema schema = SortTestUtilities.nonNullSchema(); SingleRowSet rowSet = new RowSetBuilder(fixture.allocator(), schema) - .add(1, "1") - .add(0, "0") + .addRow(1, "1") + .addRow(0, "0") .withSv2() .build(); SingleRowSet expected = new RowSetBuilder(fixture.allocator(), schema) - .add(0, "0") - .add(1, "1") + .addRow(0, "0") + .addRow(1, "1") .build(); runSorterTest(rowSet, expected); } @@ -207,11 +207,11 @@ public class TestSorter extends DrillTest { for (int i = 0; i < items.length; i++) { DataItem item = items[i]; if (nullable && item.isNull) { - writer.column(0).setNull(); + writer.scalar(0).setNull(); } else { RowSetUtilities.setFromInt(writer, 0, item.key); } - writer.column(1).setString(Integer.toString(item.value)); + writer.scalar(1).setString(Integer.toString(item.value)); writer.save(); } writer.done(); @@ -221,7 +221,7 @@ public class TestSorter extends DrillTest { private void verify(RowSet actual) { DataItem expected[] = Arrays.copyOf(data, data.length); doSort(expected); - RowSet expectedRows = makeDataSet(actual.allocator(), actual.schema().batch(), expected); + RowSet expectedRows = makeDataSet(actual.allocator(), actual.batchSchema(), expected); doVerify(expected, expectedRows, actual); } @@ -369,7 +369,7 @@ public class TestSorter extends DrillTest { int mo = rand.nextInt(12); int yr = rand.nextInt(10); Period period = makePeriod(yr, mo, day, hr, min, sec, ms); - builder.add(period); + builder.addRow(period); } return builder.build(); } @@ -383,7 +383,7 @@ public class TestSorter extends DrillTest { int prevMonths = 0; long prevMs = 0; while (reader.next()) { - Period period = reader.column(0).getPeriod().normalizedStandard(); + Period period = reader.scalar(0).getPeriod().normalizedStandard(); int years = period.getYears(); assertTrue(prevYears <= years); if (prevYears != years) { @@ -586,16 +586,16 @@ public class TestSorter extends DrillTest { .build(); SingleRowSet input = fixture.rowSetBuilder(schema) - .add(3, "third") - .add(1, "first") - .add(2, "second") + .addRow(3, "third") + .addRow(1, "first") + .addRow(2, "second") .withSv2() .build(); SingleRowSet output = fixture.rowSetBuilder(schema) - .add(1, "first") - .add(2, "second") - .add(3, "third") + .addRow(1, "first") + .addRow(2, "second") + .addRow(3, "third") .build(); Sort popConfig = makeSortConfig("map.key", Ordering.ORDER_ASC, Ordering.NULLS_LAST); runSorterTest(popConfig, input, output); http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java new file mode 100644 index 0000000..f9f5128 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.*; + +import java.util.Arrays; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +/** + * Tests of the row limit functionality of the result set loader. The + * row limit is set up front and has a default value. Because Drill must + * discover data structure as it reads, the result set loader also allows changing + * the row limit between batches (perhaps Drill discovers that rows are much + * narrower or wider than expected.) + * <p> + * The tests here are independent of the tests for vector allocation (which does, + * in fact, depend on the row count) and vector overflow (which an occur when + * the row limit turns out to be too large.) + */ + +public class TestResultSetLoaderLimits extends SubOperatorTest { + + /** + * Verify that the writer stops when reaching the row limit. + * In this case there is no look-ahead row. + */ + + @Test + public void testRowLimit() { + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator()); + assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, rsLoader.targetRowCount()); + RowSetLoader rootWriter = rsLoader.writer(); + rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED)); + + byte value[] = new byte[200]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + rsLoader.startBatch(); + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, count); + assertEquals(count, rootWriter.rowCount()); + + rsLoader.harvest().clear(); + + // Do it again, a different way. + + count = 0; + rsLoader.startBatch(); + assertEquals(0, rootWriter.rowCount()); + while (rootWriter.start()) { + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, count); + assertEquals(count, rootWriter.rowCount()); + + rsLoader.harvest().clear(); + + rsLoader.close(); + } + + private static final int TEST_ROW_LIMIT = 1024; + + /** + * Verify that the caller can set a row limit lower than the default. + */ + + @Test + public void testCustomRowLimit() { + + // Try to set a default value larger than the hard limit. Value + // is truncated to the limit. + + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1) + .build(); + assertEquals(ValueVector.MAX_ROW_COUNT, options.rowCountLimit); + + // Just a bit of paranoia that we check against the vector limit, + // not any previous value... + + options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1) + .setRowCountLimit(TEST_ROW_LIMIT) + .build(); + assertEquals(TEST_ROW_LIMIT, options.rowCountLimit); + + options = new OptionBuilder() + .setRowCountLimit(TEST_ROW_LIMIT) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1) + .build(); + assertEquals(ValueVector.MAX_ROW_COUNT, options.rowCountLimit); + + // Can't set the limit lower than 1 + + options = new OptionBuilder() + .setRowCountLimit(0) + .build(); + assertEquals(1, options.rowCountLimit); + + // Do load with a (valid) limit lower than the default. + + options = new OptionBuilder() + .setRowCountLimit(TEST_ROW_LIMIT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(TEST_ROW_LIMIT, rsLoader.targetRowCount()); + + RowSetLoader rootWriter = rsLoader.writer(); + rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED)); + + rsLoader.startBatch(); + int count = fillToLimit(rootWriter); + assertEquals(TEST_ROW_LIMIT, count); + assertEquals(count, rootWriter.rowCount()); + + // Should fail to write beyond the row limit + + assertFalse(rootWriter.start()); + try { + rootWriter.save(); + fail(); + } catch (IllegalStateException e) { + // Expected + } + + rsLoader.harvest().clear(); + rsLoader.startBatch(); + assertEquals(0, rootWriter.rowCount()); + + rsLoader.close(); + } + + private int fillToLimit(RowSetLoader rootWriter) { + byte value[] = new byte[200]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + return count; + } + + /** + * Test that the row limit can change between batches. + */ + + @Test + public void testDynamicLimit() { + + // Start with a small limit. + + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(TEST_ROW_LIMIT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(TEST_ROW_LIMIT, rsLoader.targetRowCount()); + + RowSetLoader rootWriter = rsLoader.writer(); + rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED)); + + rsLoader.startBatch(); + int count = fillToLimit(rootWriter); + assertEquals(TEST_ROW_LIMIT, count); + assertEquals(count, rootWriter.rowCount()); + rsLoader.harvest().clear(); + + // Reset the batch size larger and fill a second batch + + int newLimit = 8000; + rsLoader.setTargetRowCount(newLimit); + rsLoader.startBatch(); + count = fillToLimit(rootWriter); + assertEquals(newLimit, count); + assertEquals(count, rootWriter.rowCount()); + rsLoader.harvest().clear(); + + // Put the limit back to a lower number. + + newLimit = 1000; + rsLoader.setTargetRowCount(newLimit); + rsLoader.startBatch(); + count = fillToLimit(rootWriter); + assertEquals(newLimit, count); + assertEquals(count, rootWriter.rowCount()); + rsLoader.harvest().clear(); + + rsLoader.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java new file mode 100644 index 0000000..115e52d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ArrayReader; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarElementReader; +import org.apache.drill.exec.vector.accessor.ScalarReader; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleReader; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +/** + * Test map array support in the result set loader. + * <p> + * The tests here should be considered in the "extra for experts" + * category: run and/or debug these tests only after the scalar + * tests work. Maps, and especially repeated maps, are very complex + * constructs not to be tackled lightly. + */ + +public class TestResultSetLoaderMapArray extends SubOperatorTest { + + @Test + public void testBasics() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMapArray("m") + .add("c", MinorType.INT) + .add("d", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Verify structure and schema + + TupleMetadata actualSchema = rootWriter.schema(); + assertEquals(2, actualSchema.size()); + assertTrue(actualSchema.metadata(1).isArray()); + assertTrue(actualSchema.metadata(1).isMap()); + assertEquals(2, actualSchema.metadata("m").mapSchema().size()); + assertEquals(2, actualSchema.column("m").getChildren().size()); + + // Write a couple of rows with arrays. + + rsLoader.startBatch(); + rootWriter + .addRow(10, new Object[] { + new Object[] {110, "d1.1"}, + new Object[] {120, "d2.2"}}) + .addRow(20, new Object[] {}) + .addRow(30, new Object[] { + new Object[] {310, "d3.1"}, + new Object[] {320, "d3.2"}, + new Object[] {330, "d3.3"}}) + ; + + // Verify the first batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] { + new Object[] {110, "d1.1"}, + new Object[] {120, "d2.2"}}) + .addRow(20, new Object[] {}) + .addRow(30, new Object[] { + new Object[] {310, "d3.1"}, + new Object[] {320, "d3.2"}, + new Object[] {330, "d3.3"}}) + .build(); + new RowSetComparison(expected).verifyAndClearAll(actual); + + // In the second, create a row, then add a map member. + // Should be back-filled to empty for the first row. + + rsLoader.startBatch(); + rootWriter + .addRow(40, new Object[] { + new Object[] {410, "d4.1"}, + new Object[] {420, "d4.2"}}); + + TupleWriter mapWriter = rootWriter.array("m").tuple(); + mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL)); + + rootWriter + .addRow(50, new Object[] { + new Object[] {510, "d5.1", "e5.1"}, + new Object[] {520, "d5.2", null}}) + .addRow(60, new Object[] { + new Object[] {610, "d6.1", "e6.1"}, + new Object[] {620, "d6.2", null}, + new Object[] {630, "d6.3", "e6.3"}}) + ; + + // Verify the second batch + + actual = fixture.wrap(rsLoader.harvest()); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMapArray("m") + .add("c", MinorType.INT) + .add("d", MinorType.VARCHAR) + .addNullable("e", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + expected = fixture.rowSetBuilder(expectedSchema) + .addRow(40, new Object[] { + new Object[] {410, "d4.1", null}, + new Object[] {420, "d4.2", null}}) + .addRow(50, new Object[] { + new Object[] {510, "d5.1", "e5.1"}, + new Object[] {520, "d5.2", null}}) + .addRow(60, new Object[] { + new Object[] {610, "d6.1", "e6.1"}, + new Object[] {620, "d6.2", null}, + new Object[] {630, "d6.3", "e6.3"}}) + .build(); + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + @Test + public void testNestedArray() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMapArray("m") + .add("c", MinorType.INT) + .addArray("d", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Write a couple of rows with arrays within arrays. + // (And, of course, the Varchar is actually an array of + // bytes, so that's three array levels.) + + rsLoader.startBatch(); + rootWriter + .addRow(10, new Object[] { + new Object[] {110, new String[] {"d1.1.1", "d1.1.2"}}, + new Object[] {120, new String[] {"d1.2.1", "d1.2.2"}}}) + .addRow(20, new Object[] {}) + .addRow(30, new Object[] { + new Object[] {310, new String[] {"d3.1.1", "d3.2.2"}}, + new Object[] {320, new String[] {}}, + new Object[] {330, new String[] {"d3.3.1", "d1.2.2"}}}) + ; + + // Verify the batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] { + new Object[] {110, new String[] {"d1.1.1", "d1.1.2"}}, + new Object[] {120, new String[] {"d1.2.1", "d1.2.2"}}}) + .addRow(20, new Object[] {}) + .addRow(30, new Object[] { + new Object[] {310, new String[] {"d3.1.1", "d3.2.2"}}, + new Object[] {320, new String[] {}}, + new Object[] {330, new String[] {"d3.3.1", "d1.2.2"}}}) + .build(); + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Test a doubly-nested arrays of maps. + */ + + @Test + public void testDoubleNestedArray() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMapArray("m1") + .add("b", MinorType.INT) + .addMapArray("m2") + .add("c", MinorType.INT) + .addArray("d", MinorType.VARCHAR) + .buildMap() + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + rsLoader.startBatch(); + + ScalarWriter aWriter = rootWriter.scalar("a"); + ArrayWriter a1Writer = rootWriter.array("m1"); + TupleWriter m1Writer = a1Writer.tuple(); + ScalarWriter bWriter = m1Writer.scalar("b"); + ArrayWriter a2Writer = m1Writer.array("m2"); + TupleWriter m2Writer = a2Writer.tuple(); + ScalarWriter cWriter = m2Writer.scalar("c"); + ScalarWriter dWriter = m2Writer.array("d").scalar(); + + for (int i = 0; i < 5; i++) { + rootWriter.start(); + aWriter.setInt(i); + for (int j = 0; j < 4; j++) { + int a1Key = i + 10 + j; + bWriter.setInt(a1Key); + for (int k = 0; k < 3; k++) { + int a2Key = a1Key * 10 + k; + cWriter.setInt(a2Key); + for (int l = 0; l < 2; l++) { + dWriter.setString("d-" + (a2Key * 10 + l)); + } + a2Writer.save(); + } + a1Writer.save(); + } + rootWriter.save(); + } + + RowSet results = fixture.wrap(rsLoader.harvest()); + RowSetReader reader = results.reader(); + + ScalarReader aReader = reader.scalar("a"); + ArrayReader a1Reader = reader.array("m1"); + TupleReader m1Reader = a1Reader.tuple(); + ScalarReader bReader = m1Reader.scalar("b"); + ArrayReader a2Reader = m1Reader.array("m2"); + TupleReader m2Reader = a2Reader.tuple(); + ScalarReader cReader = m2Reader.scalar("c"); + ScalarElementReader dReader = m2Reader.elements("d"); + + for (int i = 0; i < 5; i++) { + reader.next(); + assertEquals(i, aReader.getInt()); + for (int j = 0; j < 4; j++) { + a1Reader.setPosn(j); + int a1Key = i + 10 + j; + assertEquals(a1Key, bReader.getInt()); + for (int k = 0; k < 3; k++) { + a2Reader.setPosn(k); + int a2Key = a1Key * 10 + k; + assertEquals(a2Key, cReader.getInt()); + for (int l = 0; l < 2; l++) { + assertEquals("d-" + (a2Key * 10 + l), dReader.getString(l)); + } + } + } + } + rsLoader.close(); + } + + /** + * Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test + * that uses nested columns inside an array of maps. Here we must call + * <tt>start()</tt> to reset the array back to the initial start position after + * each "discard." + */ + + @Test + public void testOverwriteRow() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMapArray("m") + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Can't use the shortcut to populate rows when doing overwrites. + + ScalarWriter aWriter = rootWriter.scalar("a"); + ArrayWriter maWriter = rootWriter.array("m"); + TupleWriter mWriter = maWriter.tuple(); + ScalarWriter bWriter = mWriter.scalar("b"); + ScalarWriter cWriter = mWriter.scalar("c"); + + // Write 100,000 rows, overwriting 99% of them. This will cause vector + // overflow and data corruption if overwrite does not work; but will happily + // produce the correct result if everything works as it should. + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + rsLoader.startBatch(); + while (count < 10_000) { + rootWriter.start(); + count++; + aWriter.setInt(count); + for (int i = 0; i < 10; i++) { + bWriter.setInt(count * 10 + i); + cWriter.setBytes(value, value.length); + maWriter.save(); + } + if (count % 100 == 0) { + rootWriter.save(); + } + } + + // Verify using a reader. + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(count / 100, result.rowCount()); + RowSetReader reader = result.reader(); + ArrayReader maReader = reader.array("m"); + TupleReader mReader = maReader.tuple(); + int rowId = 1; + while (reader.next()) { + assertEquals(rowId * 100, reader.scalar("a").getInt()); + assertEquals(10, maReader.size()); + for (int i = 0; i < 10; i++) { + maReader.setPosn(i); + assertEquals(rowId * 1000 + i, mReader.scalar("b").getInt()); + assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes())); + } + rowId++; + } + + result.clear(); + rsLoader.close(); + } + + /** + * Check that the "fill-empties" logic descends down into + * a repeated map. + */ + + @Test + public void testOmittedValues() { + TupleMetadata schema = new SchemaBuilder() + .add("id", MinorType.INT) + .addMapArray("m") + .addNullable("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + int mapSkip = 5; + int entrySkip = 3; + int rowCount = 1000; + int entryCount = 10; + + rsLoader.startBatch(); + ArrayWriter maWriter = rootWriter.array("m"); + TupleWriter mWriter = maWriter.tuple(); + for (int i = 0; i < rowCount; i++) { + rootWriter.start(); + rootWriter.scalar(0).setInt(i); + if (i % mapSkip != 0) { + for (int j = 0; j < entryCount; j++) { + if (j % entrySkip != 0) { + mWriter.scalar(0).setInt(i * entryCount + j); + mWriter.scalar(1).setString("b-" + i + "." + j); + } + maWriter.save(); + } + } + rootWriter.save(); + } + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(rowCount, result.rowCount()); + RowSetReader reader = result.reader(); + ArrayReader maReader = reader.array("m"); + TupleReader mReader = maReader.tuple(); + for (int i = 0; i < rowCount; i++) { + assertTrue(reader.next()); + assertEquals(i, reader.scalar(0).getInt()); + if (i % mapSkip == 0) { + assertEquals(0, maReader.size()); + continue; + } + assertEquals(entryCount, maReader.size()); + for (int j = 0; j < entryCount; j++) { + maReader.setPosn(j); + if (j % entrySkip == 0) { + assertTrue(mReader.scalar(0).isNull()); + assertTrue(mReader.scalar(1).isNull()); + } else { + assertFalse(mReader.scalar(0).isNull()); + assertFalse(mReader.scalar(1).isNull()); + assertEquals(i * entryCount + j, mReader.scalar(0).getInt()); + assertEquals("b-" + i + "." + j, mReader.scalar(1).getString()); + } + } + } + result.clear(); + rsLoader.close(); + } + + /** + * Test that memory is released if the loader is closed with an active + * batch (that is, before the batch is harvested.) + */ + + @Test + public void testCloseWithoutHarvest() { + TupleMetadata schema = new SchemaBuilder() + .addMapArray("m") + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + ArrayWriter maWriter = rootWriter.array("m"); + TupleWriter mWriter = maWriter.tuple(); + rsLoader.startBatch(); + for (int i = 0; i < 40; i++) { + rootWriter.start(); + for (int j = 0; j < 3; j++) { + mWriter.scalar("a").setInt(i); + mWriter.scalar("b").setString("b-" + i); + maWriter.save(); + } + rootWriter.save(); + } + + // Don't harvest the batch. Allocator will complain if the + // loader does not release memory. + + rsLoader.close(); + } +}