http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java new file mode 100644 index 0000000..b23eb0d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java @@ -0,0 +1,810 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleReader; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +/** + * Test (non-array) map support in the result set loader and related classes. + */ + +public class TestResultSetLoaderMaps extends SubOperatorTest { + + @Test + public void testBasics() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("c", MinorType.INT) + .add("d", MinorType.VARCHAR) + .buildMap() + .add("e", MinorType.VARCHAR) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Verify structure and schema + + assertEquals(5, rsLoader.schemaVersion()); + TupleMetadata actualSchema = rootWriter.schema(); + assertEquals(3, actualSchema.size()); + assertTrue(actualSchema.metadata(1).isMap()); + assertEquals(2, actualSchema.metadata("m").mapSchema().size()); + assertEquals(2, actualSchema.column("m").getChildren().size()); + + rsLoader.startBatch(); + + // Write a row the way that clients will do. + + ScalarWriter aWriter = rootWriter.scalar("a"); + TupleWriter mWriter = rootWriter.tuple("m"); + ScalarWriter cWriter = mWriter.scalar("c"); + ScalarWriter dWriter = mWriter.scalar("d"); + ScalarWriter eWriter = rootWriter.scalar("e"); + + rootWriter.start(); + aWriter.setInt(10); + cWriter.setInt(110); + dWriter.setString("fred"); + eWriter.setString("pebbles"); + rootWriter.save(); + + // Try adding a duplicate column. + + try { + mWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL)); + fail(); + } catch (IllegalArgumentException e) { + // Expected + } + + // Write another using the test-time conveniences + + rootWriter.addRow(20, new Object[] {210, "barney"}, "bam-bam"); + + // Harvest the batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + assertEquals(5, rsLoader.schemaVersion()); + assertEquals(2, actual.rowCount()); + + // Validate data + + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] {110, "fred"}, "pebbles") + .addRow(20, new Object[] {210, "barney"}, "bam-bam") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + rsLoader.close(); + } + + /** + * Create schema with a map, then add columns to the map + * after delivering the first batch. The new columns should appear + * in the second-batch output. + */ + + @Test + public void testMapEvolution() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(3, rsLoader.schemaVersion()); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + rootWriter + .addRow(10, new Object[] {"fred"}) + .addRow(20, new Object[] {"barney"}); + + RowSet actual = fixture.wrap(rsLoader.harvest()); + assertEquals(3, rsLoader.schemaVersion()); + assertEquals(2, actual.rowCount()); + + // Validate first batch + + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] {"fred"}) + .addRow(20, new Object[] {"barney"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + // Add three columns in the second batch. One before + // the batch starts, one before the first row, and one after + // the first row. + + TupleWriter mapWriter = rootWriter.tuple("m"); + mapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED)); + + rsLoader.startBatch(); + mapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.BIGINT, DataMode.REQUIRED)); + + rootWriter.addRow(30, new Object[] {"wilma", 130, 130_000L}); + + mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED)); + rootWriter.addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"}); + + actual = fixture.wrap(rsLoader.harvest()); + assertEquals(6, rsLoader.schemaVersion()); + assertEquals(2, actual.rowCount()); + + // Validate first batch + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.VARCHAR) + .add("c", MinorType.INT) + .add("d", MinorType.BIGINT) + .add("e", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + expected = fixture.rowSetBuilder(expectedSchema) + .addRow(30, new Object[] {"wilma", 130, 130_000L, ""}) + .addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Test adding a map to a loader after writing the first row. + */ + + @Test + public void testMapAddition() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(1, rsLoader.schemaVersion()); + RowSetLoader rootWriter = rsLoader.writer(); + + // Start without the map. Add a map after the first row. + + rsLoader.startBatch(); + rootWriter.addRow(10); + + int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED)); + TupleWriter mapWriter = rootWriter.tuple(mapIndex); + + // Add a column to the map with the same name as the top-level column. + // Verifies that the name spaces are independent. + + mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED)); + + rootWriter + .addRow(20, new Object[]{"fred"}) + .addRow(30, new Object[]{"barney"}); + + RowSet actual = fixture.wrap(rsLoader.harvest()); + assertEquals(3, rsLoader.schemaVersion()); + assertEquals(3, actual.rowCount()); + + // Validate first batch + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("a", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, new Object[] {""}) + .addRow(20, new Object[] {"fred"}) + .addRow(30, new Object[] {"barney"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Test adding an empty map to a loader after writing the first row. + * Then add columns in another batch. Yes, this is a bizarre condition, + * but we must check it anyway for robustness. + */ + + @Test + public void testEmptyMapAddition() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(1, rsLoader.schemaVersion()); + RowSetLoader rootWriter = rsLoader.writer(); + + // Start without the map. Add a map after the first row. + + rsLoader.startBatch(); + rootWriter.addRow(10); + + int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED)); + TupleWriter mapWriter = rootWriter.tuple(mapIndex); + + rootWriter + .addRow(20, new Object[]{}) + .addRow(30, new Object[]{}); + + RowSet actual = fixture.wrap(rsLoader.harvest()); + assertEquals(2, rsLoader.schemaVersion()); + assertEquals(3, actual.rowCount()); + + // Validate first batch + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .buildMap() + .buildSchema(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, new Object[] {}) + .addRow(20, new Object[] {}) + .addRow(30, new Object[] {}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + // Now add another column to the map + + rsLoader.startBatch(); + mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED)); + + rootWriter + .addRow(40, new Object[]{"fred"}) + .addRow(50, new Object[]{"barney"}); + + actual = fixture.wrap(rsLoader.harvest()); + assertEquals(3, rsLoader.schemaVersion()); + assertEquals(2, actual.rowCount()); + + // Validate first batch + + expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("a", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + expected = fixture.rowSetBuilder(expectedSchema) + .addRow(40, new Object[] {"fred"}) + .addRow(50, new Object[] {"barney"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Create nested maps. Then, add columns to each map + * on the fly. Use required, variable-width columns since + * those require the most processing and are most likely to + * fail if anything is out of place. + */ + + @Test + public void testNestedMapsRequired() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m1") + .add("b", MinorType.VARCHAR) + .addMap("m2") + .add("c", MinorType.VARCHAR) + .buildMap() + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(5, rsLoader.schemaVersion()); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}}); + + // Validate first batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + assertEquals(5, rsLoader.schemaVersion()); + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] {"b1", new Object[] {"c1"}}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + // Now add columns in the second batch. + + rsLoader.startBatch(); + rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}}); + + TupleWriter m1Writer = rootWriter.tuple("m1"); + m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED)); + TupleWriter m2Writer = m1Writer.tuple("m2"); + m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED)); + + rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"}); + + // And another set while the write proceeds. + + m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REQUIRED)); + m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.REQUIRED)); + + rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"}); + + // Validate second batch + + actual = fixture.wrap(rsLoader.harvest()); + assertEquals(9, rsLoader.schemaVersion()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m1") + .add("b", MinorType.VARCHAR) + .addMap("m2") + .add("c", MinorType.VARCHAR) + .add("e", MinorType.VARCHAR) + .add("g", MinorType.VARCHAR) + .buildMap() + .add("d", MinorType.VARCHAR) + .add("f", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + expected = fixture.rowSetBuilder(expectedSchema) + .addRow(20, new Object[] {"b2", new Object[] {"c2", "", "" }, "", "" }) + .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", "" }, "d3", "" }) + .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Create nested maps. Then, add columns to each map + * on the fly. This time, with nullable types. + */ + + @Test + public void testNestedMapsNullable() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m1") + .addNullable("b", MinorType.VARCHAR) + .addMap("m2") + .addNullable("c", MinorType.VARCHAR) + .buildMap() + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}}); + + // Validate first batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] {"b1", new Object[] {"c1"}}) + .build(); +// actual.print(); +// expected.print(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + // Now add columns in the second batch. + + rsLoader.startBatch(); + rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}}); + + TupleWriter m1Writer = rootWriter.tuple("m1"); + m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL)); + TupleWriter m2Writer = m1Writer.tuple("m2"); + m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL)); + + rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"}); + + // And another set while the write proceeds. + + m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.OPTIONAL)); + m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.OPTIONAL)); + + rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"}); + + // Validate second batch + + actual = fixture.wrap(rsLoader.harvest()); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m1") + .addNullable("b", MinorType.VARCHAR) + .addMap("m2") + .addNullable("c", MinorType.VARCHAR) + .addNullable("e", MinorType.VARCHAR) + .addNullable("g", MinorType.VARCHAR) + .buildMap() + .addNullable("d", MinorType.VARCHAR) + .addNullable("f", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + expected = fixture.rowSetBuilder(expectedSchema) + .addRow(20, new Object[] {"b2", new Object[] {"c2", null, null}, null, null}) + .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", null}, "d3", null}) + .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Test a map that contains a scalar array. No reason to suspect that this + * will have problem as the array writer is fully tested in the accessor + * subsystem. Still, need to test the cardinality methods of the loader + * layer. + */ + + @Test + public void testMapWithArray() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .addArray("c", MinorType.INT) + .addArray("d", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Write some rows + + rsLoader.startBatch(); + rootWriter + .addRow(10, new Object[] {new int[] {110, 120, 130}, + new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}}) + .addRow(20, new Object[] {new int[] {210}, new String[] {}}) + .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}}) + ; + + // Validate first batch + + RowSet actual = fixture.wrap(rsLoader.harvest()); + SingleRowSet expected = fixture.rowSetBuilder(schema) + .addRow(10, new Object[] {new int[] {110, 120, 130}, + new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}}) + .addRow(20, new Object[] {new int[] {210}, new String[] {}}) + .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}}) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + // Add another array after the first row in the second batch. + + rsLoader.startBatch(); + rootWriter + .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}}) + .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}}) + ; + + TupleWriter mapWriter = rootWriter.tuple("m"); + mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED)); + rootWriter + .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}}) + .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}}) + ; + + // Validate first batch. The new array should have been back-filled with + // empty offsets for the missing rows. + + actual = fixture.wrap(rsLoader.harvest()); +// System.out.println(actual.schema().toString()); + expected = fixture.rowSetBuilder(actual.schema()) + .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}, new String[] {}}) + .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}, new String[] {}}) + .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}}) + .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}}) + .build(); +// expected.print(); + + new RowSetComparison(expected).verifyAndClearAll(actual); + + rsLoader.close(); + } + + /** + * Create a schema with a map, then trigger an overflow on one of the columns + * in the map. Proper overflow handling should occur regardless of nesting + * depth. + */ + + @Test + public void testMapWithOverflow() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m1") + .add("b", MinorType.INT) + .addMap("m2") + .add("c", MinorType.INT) // Before overflow, written + .add("d", MinorType.VARCHAR) + .add("e", MinorType.INT) // After overflow, not yet written + .buildMap() + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + rsLoader.startBatch(); + while (! rootWriter.isFull()) { + rootWriter.addRow(count, new Object[] {count * 10, new Object[] {count * 100, value, count * 1000}}); + count++; + } + + // Our row count should include the overflow row + + int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length; + assertEquals(expectedCount + 1, count); + + // Loader's row count should include only "visible" rows + + assertEquals(expectedCount, rootWriter.rowCount()); + + // Total count should include invisible and look-ahead rows. + + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + result.clear(); + + rsLoader.close(); + } + + /** + * Test the case in which a new column is added during the overflow row. Unlike + * the top-level schema case, internally we must create a copy of the map, and + * move vectors across only when the result is to include the schema version + * of the target column. For overflow, the new column is added after the + * first batch; it is added in the second batch that contains the overflow + * row in which the column was added. + */ + + @Test + public void testMapOverflowWithNewColumn() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + assertEquals(4, rsLoader.schemaVersion()); + RowSetLoader rootWriter = rsLoader.writer(); + + // Can't use the shortcut to populate rows when doing a schema + // change. + + ScalarWriter aWriter = rootWriter.scalar("a"); + TupleWriter mWriter = rootWriter.tuple("m"); + ScalarWriter bWriter = mWriter.scalar("b"); + ScalarWriter cWriter = mWriter.scalar("c"); + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + rsLoader.startBatch(); + while (! rootWriter.isFull()) { + rootWriter.start(); + aWriter.setInt(count); + bWriter.setInt(count * 10); + cWriter.setBytes(value, value.length); + if (rootWriter.isFull()) { + + // Overflow just occurred. Add another column. + + mWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.OPTIONAL)); + mWriter.scalar("d").setInt(count * 100); + } + rootWriter.save(); + count++; + } + + // Result set should include the original columns, but not d. + + RowSet result = fixture.wrap(rsLoader.harvest()); + + assertEquals(4, rsLoader.schemaVersion()); + assertTrue(schema.isEquivalent(result.schema())); + BatchSchema expectedSchema = new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()); + assertTrue(expectedSchema.isEquivalent(result.batchSchema())); + + // Use a reader to validate row-by-row. Too large to create an expected + // result set. + + RowSetReader reader = result.reader(); + TupleReader mapReader = reader.tuple("m"); + int rowId = 0; + while (reader.next()) { + assertEquals(rowId, reader.scalar("a").getInt()); + assertEquals(rowId * 10, mapReader.scalar("b").getInt()); + assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes())); + rowId++; + } + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + + reader = result.reader(); + mapReader = reader.tuple("m"); + while (reader.next()) { + assertEquals(rowId, reader.scalar("a").getInt()); + assertEquals(rowId * 10, mapReader.scalar("b").getInt()); + assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes())); + assertEquals(rowId * 100, mapReader.scalar("d").getInt()); + } + result.clear(); + + rsLoader.close(); + } + + /** + * Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test + * that uses nested columns. + */ + + @Test + public void testOverwriteRow() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .buildMap() + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Can't use the shortcut to populate rows when doing overwrites. + + ScalarWriter aWriter = rootWriter.scalar("a"); + TupleWriter mWriter = rootWriter.tuple("m"); + ScalarWriter bWriter = mWriter.scalar("b"); + ScalarWriter cWriter = mWriter.scalar("c"); + + // Write 100,000 rows, overwriting 99% of them. This will cause vector + // overflow and data corruption if overwrite does not work; but will happily + // produce the correct result if everything works as it should. + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + rsLoader.startBatch(); + while (count < 100_000) { + rootWriter.start(); + count++; + aWriter.setInt(count); + bWriter.setInt(count * 10); + cWriter.setBytes(value, value.length); + if (count % 100 == 0) { + rootWriter.save(); + } + } + + // Verify using a reader. + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(count / 100, result.rowCount()); + RowSetReader reader = result.reader(); + TupleReader mReader = reader.tuple("m"); + int rowId = 1; + while (reader.next()) { + assertEquals(rowId * 100, reader.scalar("a").getInt()); + assertEquals(rowId * 1000, mReader.scalar("b").getInt()); + assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes())); + rowId++; + } + + result.clear(); + rsLoader.close(); + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java new file mode 100644 index 0000000..2c4c87b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +public class TestResultSetLoaderOmittedValues extends SubOperatorTest { + + /** + * Test "holes" in the middle of a batch, and unset columns at + * the end. Ending the batch should fill in missing values. + */ + + @Test + public void testOmittedValuesAtEnd() { + + // Create columns up front + + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addNullable("c", MinorType.VARCHAR) + .add("d", MinorType.INT) + .addNullable("e", MinorType.INT) + .addArray("f", MinorType.VARCHAR) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + int rowCount = 0; + ScalarWriter arrayWriter; + for (int i = 0; i < 2; i++) { // Row 0, 1 + rootWriter.start(); + rowCount++; + rootWriter.scalar(0).setInt(rowCount); + rootWriter.scalar(1).setString("b_" + rowCount); + rootWriter.scalar(2).setString("c_" + rowCount); + rootWriter.scalar(3).setInt(rowCount * 10); + rootWriter.scalar(4).setInt(rowCount * 100); + arrayWriter = rootWriter.column(5).array().scalar(); + arrayWriter.setString("f_" + rowCount + "-1"); + arrayWriter.setString("f_" + rowCount + "-2"); + rootWriter.save(); + } + + // Holes in half the columns + + for (int i = 0; i < 2; i++) { // Rows 2, 3 + rootWriter.start(); + rowCount++; + rootWriter.scalar(0).setInt(rowCount); + rootWriter.scalar(1).setString("b_" + rowCount); + rootWriter.scalar(3).setInt(rowCount * 10); + arrayWriter = rootWriter.column(5).array().scalar(); + arrayWriter.setString("f_" + rowCount + "-1"); + arrayWriter.setString("f_" + rowCount + "-2"); + rootWriter.save(); + } + + // Holes in the other half + + for (int i = 0; i < 2; i++) { // Rows 4, 5 + rootWriter.start(); + rowCount++; + rootWriter.scalar(0).setInt(rowCount); + rootWriter.scalar(2).setString("c_" + rowCount); + rootWriter.scalar(4).setInt(rowCount * 100); + rootWriter.save(); + } + + // All columns again. + + for (int i = 0; i < 2; i++) { // Rows 6, 7 + rootWriter.start(); + rowCount++; + rootWriter.scalar(0).setInt(rowCount); + rootWriter.scalar(1).setString("b_" + rowCount); + rootWriter.scalar(2).setString("c_" + rowCount); + rootWriter.scalar(3).setInt(rowCount * 10); + rootWriter.scalar(4).setInt(rowCount * 100); + arrayWriter = rootWriter.column(5).array().scalar(); + arrayWriter.setString("f_" + rowCount + "-1"); + arrayWriter.setString("f_" + rowCount + "-2"); + rootWriter.save(); + } + + // Omit all but the key column at end + + for (int i = 0; i < 2; i++) { // Rows 8, 9 + rootWriter.start(); + rowCount++; + rootWriter.scalar(0).setInt(rowCount); + rootWriter.save(); + } + + // Harvest the row and verify. + + RowSet actual = fixture.wrap(rsLoader.harvest()); +// actual.print(); + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addNullable("c", MinorType.VARCHAR) + .add("3", MinorType.INT) + .addNullable("e", MinorType.INT) + .addArray("f", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow( 1, "b_1", "c_1", 10, 100, new String[] {"f_1-1", "f_1-2"}) + .addRow( 2, "b_2", "c_2", 20, 200, new String[] {"f_2-1", "f_2-2"}) + .addRow( 3, "b_3", null, 30, null, new String[] {"f_3-1", "f_3-2"}) + .addRow( 4, "b_4", null, 40, null, new String[] {"f_4-1", "f_4-2"}) + .addRow( 5, "", "c_5", 0, 500, new String[] {}) + .addRow( 6, "", "c_6", 0, 600, new String[] {}) + .addRow( 7, "b_7", "c_7", 70, 700, new String[] {"f_7-1", "f_7-2"}) + .addRow( 8, "b_8", "c_8", 80, 800, new String[] {"f_8-1", "f_8-2"}) + .addRow( 9, "", null, 0, null, new String[] {}) + .addRow( 10, "", null, 0, null, new String[] {}) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(actual); + rsLoader.close(); + } + + /** + * Test "holes" at the end of a batch when batch overflows. Completed + * batch must be finalized correctly, new batch initialized correct, + * for the missing values. + */ + + @Test + public void testOmittedValuesAtEndWithOverflow() { + TupleMetadata schema = new SchemaBuilder() + // Row index + .add("a", MinorType.INT) + // Column that forces overflow + .add("b", MinorType.VARCHAR) + // Column with all holes + .addNullable("c", MinorType.VARCHAR) + // Column with some holes + .addNullable("d", MinorType.VARCHAR) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Fill the batch. Column d has some values. Column c is worst case: no values. + + rsLoader.startBatch(); + byte value[] = new byte[533]; + Arrays.fill(value, (byte) 'X'); + int rowNumber = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rowNumber++; + rootWriter.scalar(0).setInt(rowNumber); + rootWriter.scalar(1).setBytes(value, value.length); + if (rowNumber < 10_000) { + rootWriter.scalar(3).setString("d-" + rowNumber); + } + rootWriter.save(); + assertEquals(rowNumber, rsLoader.totalRowCount()); + } + + // Harvest and verify + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(rowNumber - 1, result.rowCount()); + RowSetReader reader = result.reader(); + int rowIndex = 0; + while (reader.next()) { + int expectedRowNumber = 1 + rowIndex; + assertEquals(expectedRowNumber, reader.scalar(0).getInt()); + assertTrue(reader.scalar(2).isNull()); + if (expectedRowNumber < 10_000) { + assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString()); + } else { + assertTrue(reader.scalar(3).isNull()); + } + rowIndex++; + } + + // Start count for this batch is one less than current + // count, because of the overflow row. + + int startRowNumber = rowNumber; + + // Write a few more rows to the next batch + + rsLoader.startBatch(); + for (int i = 0; i < 10; i++) { + rootWriter.start(); + rowNumber++; + rootWriter.scalar(0).setInt(rowNumber); + rootWriter.scalar(1).setBytes(value, value.length); + if (i > 5) { + rootWriter.scalar(3).setString("d-" + rowNumber); + } + rootWriter.save(); + assertEquals(rowNumber, rsLoader.totalRowCount()); + } + + // Verify that holes were preserved. + + result = fixture.wrap(rsLoader.harvest()); + assertEquals(rowNumber, rsLoader.totalRowCount()); + assertEquals(rowNumber - startRowNumber + 1, result.rowCount()); +// result.print(); + reader = result.reader(); + rowIndex = 0; + while (reader.next()) { + int expectedRowNumber = startRowNumber + rowIndex; + assertEquals(expectedRowNumber, reader.scalar(0).getInt()); + assertTrue(reader.scalar(2).isNull()); + if (rowIndex > 6) { + assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString()); + } else { + assertTrue("Row " + rowIndex + " col d should be null", reader.scalar(3).isNull()); + } + rowIndex++; + } + assertEquals(rowIndex, 11); + + rsLoader.close(); + } + + /** + * Test that omitting the call to saveRow() effectively discards + * the row. Note that the vectors still contain values in the + * discarded position; just the various pointers are unset. If + * the batch ends before the discarded values are overwritten, the + * discarded values just exist at the end of the vector. Since vectors + * start with garbage contents, the discarded values are simply a different + * kind of garbage. But, if the client writes a new row, then the new + * row overwrites the discarded row. This works because we only change + * the tail part of a vector; never the internals. + */ + + @Test + public void testSkipRows() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + int rowNumber = 0; + for (int i = 0; i < 14; i++) { + rootWriter.start(); + rowNumber++; + rootWriter.scalar(0).setInt(rowNumber); + if (i % 3 == 0) { + rootWriter.scalar(1).setNull(); + } else { + rootWriter.scalar(1).setString("b-" + rowNumber); + } + if (i % 2 == 0) { + rootWriter.save(); + } + } + + RowSet result = fixture.wrap(rsLoader.harvest()); +// result.print(); + SingleRowSet expected = fixture.rowSetBuilder(result.batchSchema()) + .addRow( 1, null) + .addRow( 3, "b-3") + .addRow( 5, "b-5") + .addRow( 7, null) + .addRow( 9, "b-9") + .addRow(11, "b-11") + .addRow(13, null) + .build(); +// expected.print(); + new RowSetComparison(expected) + .verifyAndClearAll(result); + + rsLoader.close(); + } + + /** + * Test that discarding a row works even if that row happens to be an + * overflow row. + */ + + @Test + public void testSkipOverflowRow() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setInt(count); + rootWriter.scalar(1).setBytes(value, value.length); + + // Relies on fact that isFull becomes true right after + // a vector overflows; don't have to wait for saveRow(). + // Keep all rows, but discard the overflow row. + + if (! rootWriter.isFull()) { + rootWriter.save(); + } + count++; + } + + // Discard the results. + + rsLoader.harvest().zeroVectors(); + + // Harvest the next batch. Will be empty (because overflow row + // was discarded.) + + rsLoader.startBatch(); + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(0, result.rowCount()); + result.clear(); + + rsLoader.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java new file mode 100644 index 0000000..0146cfe --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java @@ -0,0 +1,680 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ScalarElementReader; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +import com.google.common.base.Charsets; + +/** + * Exercise the vector overflow functionality for the result set loader. + */ + +public class TestResultSetLoaderOverflow extends SubOperatorTest { + + /** + * Test that the writer detects a vector overflow. The offending column + * value should be moved to the next batch. + */ + + @Test + public void testVectorSizeLimit() { + TupleMetadata schema = new SchemaBuilder() + .add("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + + // Number of rows should be driven by vector size. + // Our row count should include the overflow row + + int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length; + assertEquals(expectedCount + 1, count); + + // Loader's row count should include only "visible" rows + + assertEquals(expectedCount, rootWriter.rowCount()); + + // Total count should include invisible and look-ahead rows. + + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + result.clear(); + + rsLoader.close(); + } + + /** + * Test that the writer detects a vector overflow. The offending column + * value should be moved to the next batch. + */ + + @Test + public void testBatchSizeLimit() { + TupleMetadata schema = new SchemaBuilder() + .add("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .setBatchSizeLimit( + 8 * 1024 * 1024 + // Data + 2 * ValueVector.MAX_ROW_COUNT * 4) // Offsets, doubled because of +1 + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + + // Our row count should include the overflow row + + int expectedCount = 8 * 1024 * 1024 / value.length; + assertEquals(expectedCount + 1, count); + + // Loader's row count should include only "visible" rows + + assertEquals(expectedCount, rootWriter.rowCount()); + + // Total count should include invisible and look-ahead rows. + + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + result.clear(); + + rsLoader.close(); + } + + /** + * Load a batch to overflow. Then, close the loader with the overflow + * batch unharvested. The Loader should release the memory allocated + * to the unused overflow vectors. + */ + + @Test + public void testCloseWithOverflow() { + TupleMetadata schema = new SchemaBuilder() + .add("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(value, value.length); + rootWriter.save(); + count++; + } + + assertTrue(count < ValueVector.MAX_ROW_COUNT); + + // Harvest the full batch + + RowSet result = fixture.wrap(rsLoader.harvest()); + result.clear(); + + // Close without harvesting the overflow batch. + + rsLoader.close(); + } + + /** + * Case where a single array fills up the vector to the maximum size + * limit. Overflow won't work here; the attempt will fail with a user + * exception. + */ + + @Test + public void testOversizeArray() { + TupleMetadata schema = new SchemaBuilder() + .addArray("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Create a single array as the column value in the first row. When + // this overflows, an exception is thrown since overflow is not possible. + + rsLoader.startBatch(); + byte value[] = new byte[473]; + Arrays.fill(value, (byte) 'X'); + rootWriter.start(); + ScalarWriter array = rootWriter.array(0).scalar(); + try { + for (int i = 0; i < ValueVector.MAX_ROW_COUNT; i++) { + array.setBytes(value, value.length); + } + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains("column value is larger than the maximum")); + } + rsLoader.close(); + } + + /** + * Test a row with a single array column which overflows. Verifies + * that all the fiddly bits about offset vectors and so on works + * correctly. Run this test (the simplest case) if you change anything + * about the array handling code. + */ + + @Test + public void testSizeLimitOnArray() { + TupleMetadata schema = new SchemaBuilder() + .addArray("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Fill batch with rows of with a single array, three values each. Tack on + // a suffix to each so we can be sure the proper data is written and moved + // to the overflow batch. + + rsLoader.startBatch(); + byte value[] = new byte[473]; + Arrays.fill(value, (byte) 'X'); + String strValue = new String(value, Charsets.UTF_8); + int count = 0; + int rowSize = 0; + int totalSize = 0; + int valuesPerArray = 13; + while (rootWriter.start()) { + totalSize += rowSize; + rowSize = 0; + ScalarWriter array = rootWriter.array(0).scalar(); + for (int i = 0; i < valuesPerArray; i++) { + String cellValue = strValue + (count + 1) + "." + i; + array.setString(cellValue); + rowSize += cellValue.length(); + } + rootWriter.save(); + count++; + } + + // Row count should include the overflow row. + + int expectedCount = count - 1; + + // Size without overflow row should fit in the vector, size + // with overflow should not. + + assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE); + assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE); + + // Result should exclude the overflow row. Last row + // should hold the last full array. + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + RowSetReader reader = result.reader(); + reader.set(expectedCount - 1); + ScalarElementReader arrayReader = reader.column(0).elements(); + assertEquals(valuesPerArray, arrayReader.size()); + for (int i = 0; i < valuesPerArray; i++) { + String cellValue = strValue + (count - 1) + "." + i; + assertEquals(cellValue, arrayReader.getString(i)); + } + result.clear(); + + // Next batch should start with the overflow row. + // The only row in this next batch should be the whole + // array being written at the time of overflow. + + rsLoader.startBatch(); +// VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5); +// ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter()); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); +// VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5); + assertEquals(1, result.rowCount()); + reader = result.reader(); + reader.next(); + arrayReader = reader.column(0).elements(); + assertEquals(valuesPerArray, arrayReader.size()); + for (int i = 0; i < valuesPerArray; i++) { + String cellValue = strValue + (count) + "." + i; + assertEquals(cellValue, arrayReader.getString(i)); + } + result.clear(); + + rsLoader.close(); + } + + /** + * Test the complete set of array overflow cases: + * <ul> + * <li>Array a is written before the column that has overflow, + * and must be copied, in its entirety, to the overflow row.</li> + * <li>Column b causes the overflow.</li> + * <li>Column c is written after the overflow, and should go + * to the look-ahead row.</li> + * <li>Column d is written for a while, then has empties before + * the overflow row, but is written in the overflow row.<li> + * <li>Column e is like d, but is not written in the overflow + * row.</li> + */ + + @Test + public void testArrayOverflowWithOtherArrays() { + TupleMetadata schema = new SchemaBuilder() + .addArray("a", MinorType.INT) + .addArray("b", MinorType.VARCHAR) + .addArray("c", MinorType.INT) + .addArray("d", MinorType.INT) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Fill batch with rows of with a single array, three values each. Tack on + // a suffix to each so we can be sure the proper data is written and moved + // to the overflow batch. + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + String strValue = new String(value, Charsets.UTF_8); + + int aCount = 3; + int bCount = 11; + int cCount = 5; + int dCount = 7; + + int cCutoff = ValueVector.MAX_BUFFER_SIZE / value.length / bCount / 2; + + ScalarWriter aWriter = rootWriter.array("a").scalar(); + ScalarWriter bWriter = rootWriter.array("b").scalar(); + ScalarWriter cWriter = rootWriter.array("c").scalar(); + ScalarWriter dWriter = rootWriter.array("d").scalar(); + + int count = 0; + rsLoader.startBatch(); + while (rootWriter.start()) { + if (rootWriter.rowCount() == 2952) { + count = count + 0; + } + for (int i = 0; i < aCount; i++) { + aWriter.setInt(count * aCount + i); + } + for (int i = 0; i < bCount; i++) { + String cellValue = strValue + (count * bCount + i); + bWriter.setString(cellValue); + } + if (count < cCutoff) { + for (int i = 0; i < cCount; i++) { + cWriter.setInt(count * cCount + i); + } + } + + // Relies on fact that isFull becomes true right after + // a vector overflows; don't have to wait for saveRow(). + + if (count < cCutoff || rootWriter.isFull()) { + for (int i = 0; i < dCount; i++) { + dWriter.setInt(count * dCount + i); + } + } + rootWriter.save(); + count++; + } + + // Verify + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(count - 1, result.rowCount()); + + RowSetReader reader = result.reader(); + ScalarElementReader aReader = reader.array("a").elements(); + ScalarElementReader bReader = reader.array("b").elements(); + ScalarElementReader cReader = reader.array("c").elements(); + ScalarElementReader dReader = reader.array("d").elements(); + + while (reader.next()) { + int rowId = reader.rowIndex(); + assertEquals(aCount, aReader.size()); + for (int i = 0; i < aCount; i++) { + assertEquals(rowId * aCount + i, aReader.getInt(i)); + } + assertEquals(bCount, bReader.size()); + for (int i = 0; i < bCount; i++) { + String cellValue = strValue + (rowId * bCount + i); + assertEquals(cellValue, bReader.getString(i)); + } + if (rowId < cCutoff) { + assertEquals(cCount, cReader.size()); + for (int i = 0; i < cCount; i++) { + assertEquals(rowId * cCount + i, cReader.getInt(i)); + } + assertEquals(dCount, dReader.size()); + for (int i = 0; i < dCount; i++) { + assertEquals(rowId * dCount + i, dReader.getInt(i)); + } + } else { + assertEquals(0, cReader.size()); + assertEquals(0, dReader.size()); + } + } + result.clear(); + int firstCount = count - 1; + + // One row is in the batch. Write more, skipping over the + // initial few values for columns c and d. Column d has a + // roll-over value, c has an empty roll-over. + + rsLoader.startBatch(); + for (int j = 0; j < 5; j++) { + rootWriter.start(); + for (int i = 0; i < aCount; i++) { + aWriter.setInt(count * aCount + i); + } + for (int i = 0; i < bCount; i++) { + String cellValue = strValue + (count * bCount + i); + bWriter.setString(cellValue); + } + if (j > 3) { + for (int i = 0; i < cCount; i++) { + cWriter.setInt(count * cCount + i); + } + for (int i = 0; i < dCount; i++) { + dWriter.setInt(count * dCount + i); + } + } + rootWriter.save(); + count++; + } + + result = fixture.wrap(rsLoader.harvest()); + assertEquals(6, result.rowCount()); + + reader = result.reader(); + aReader = reader.array("a").elements(); + bReader = reader.array("b").elements(); + cReader = reader.array("c").elements(); + dReader = reader.array("d").elements(); + + int j = 0; + while (reader.next()) { + int rowId = firstCount + reader.rowIndex(); + assertEquals(aCount, aReader.size()); + for (int i = 0; i < aCount; i++) { + assertEquals("Index " + i, rowId * aCount + i, aReader.getInt(i)); + } + assertEquals(bCount, bReader.size()); + for (int i = 0; i < bCount; i++) { + String cellValue = strValue + (rowId * bCount + i); + assertEquals(cellValue, bReader.getString(i)); + } + if (j > 4) { + assertEquals(cCount, cReader.size()); + for (int i = 0; i < cCount; i++) { + assertEquals(rowId * cCount + i, cReader.getInt(i)); + } + } else { + assertEquals(0, cReader.size()); + } + if (j == 0 || j > 4) { + assertEquals(dCount, dReader.size()); + for (int i = 0; i < dCount; i++) { + assertEquals(rowId * dCount + i, dReader.getInt(i)); + } + } else { + assertEquals(0, dReader.size()); + } + j++; + } + result.clear(); + + rsLoader.close(); + } + + /** + * Create an array that contains more than 64K values. Drill has no numeric + * limit on array lengths. (Well, it does, but the limit is about 2 billion + * which, even for bytes, is too large to fit into a vector...) + */ + + @Test + public void testLargeArray() { + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator()); + RowSetLoader rootWriter = rsLoader.writer(); + MaterializedField field = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REPEATED); + rootWriter.addColumn(field); + + // Create a single array as the column value in the first row. When + // this overflows, an exception is thrown since overflow is not possible. + + rsLoader.startBatch(); + rootWriter.start(); + ScalarWriter array = rootWriter.array(0).scalar(); + try { + for (int i = 0; i < Integer.MAX_VALUE; i++) { + array.setInt(i+1); + } + fail(); + } catch (UserException e) { + // Expected + } + rsLoader.close(); + } + + /** + * Test the case that an array has "missing values" before the overflow. + */ + + @Test + public void testMissingArrayValues() { + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.INT) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + + int blankAfter = ValueVector.MAX_BUFFER_SIZE / 512 * 2 / 3; + ScalarWriter cWriter = rootWriter.array("c").scalar(); + + rsLoader.startBatch(); + int rowId = 0; + while (rootWriter.start()) { + rootWriter.scalar("a").setInt(rowId); + rootWriter.scalar("b").setBytes(value, value.length); + if (rowId < blankAfter) { + for (int i = 0; i < 3; i++) { + cWriter.setInt(rowId * 3 + i); + } + } + rootWriter.save(); + rowId++; + } + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(rowId - 1, result.rowCount()); + RowSetReader reader = result.reader(); + ScalarElementReader cReader = reader.array("c").elements(); + while (reader.next()) { + assertEquals(reader.rowIndex(), reader.scalar("a").getInt()); + assertTrue(Arrays.equals(value, reader.scalar("b").getBytes())); + if (reader.rowIndex() < blankAfter) { + assertEquals(3, cReader.size()); + for (int i = 0; i < 3; i++) { + assertEquals(reader.rowIndex() * 3 + i, cReader.getInt(i)); + } + } else { + assertEquals(0, cReader.size()); + } + } + result.clear(); + rsLoader.close(); + } + + @Test + public void testOverflowWithNullables() { + TupleMetadata schema = new SchemaBuilder() + .add("n", MinorType.INT) + .addNullable("a", MinorType.VARCHAR) + .addNullable("b", MinorType.VARCHAR) + .addNullable("c", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte value[] = new byte[512]; + Arrays.fill(value, (byte) 'X'); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setInt(count); + rootWriter.scalar(1).setNull(); + rootWriter.scalar(2).setBytes(value, value.length); + rootWriter.scalar(3).setNull(); + rootWriter.save(); + count++; + } + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(count - 1, result.rowCount()); + + RowSetReader reader = result.reader(); + while (reader.next()) { + assertEquals(reader.rowIndex(), reader.scalar(0).getInt()); + assertTrue(reader.scalar(1).isNull()); + assertTrue(Arrays.equals(value, reader.scalar(2).getBytes())); + assertTrue(reader.scalar(3).isNull()); + } + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + result = fixture.wrap(rsLoader.harvest()); + reader = result.reader(); + assertEquals(1, result.rowCount()); + assertTrue(reader.next()); + assertEquals(count - 1, reader.scalar(0).getInt()); + assertTrue(reader.scalar(1).isNull()); + assertTrue(Arrays.equals(value, reader.scalar(2).getBytes())); + assertTrue(reader.scalar(3).isNull()); + result.clear(); + + rsLoader.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java new file mode 100644 index 0000000..5c6ff7b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Test of the basics of the projection mechanism. + */ + +public class TestResultSetLoaderProjection extends SubOperatorTest { + + @Test + public void testProjectionMap() { + + // Null map means everything is projected + + { + ProjectionSet projSet = ProjectionSetImpl.parse(null); + assertTrue(projSet instanceof NullProjectionSet); + assertTrue(projSet.isProjected("foo")); + } + + // Empty list means everything is projected + + { + ProjectionSet projSet = ProjectionSetImpl.parse(new ArrayList<SchemaPath>()); + assertTrue(projSet instanceof NullProjectionSet); + assertTrue(projSet.isProjected("foo")); + } + + // Simple non-map columns + + { + List<SchemaPath> projCols = new ArrayList<>(); + projCols.add(SchemaPath.getSimplePath("foo")); + projCols.add(SchemaPath.getSimplePath("bar")); + ProjectionSet projSet = ProjectionSetImpl.parse(projCols); + assertTrue(projSet instanceof ProjectionSetImpl); + assertTrue(projSet.isProjected("foo")); + assertTrue(projSet.isProjected("bar")); + assertFalse(projSet.isProjected("mumble")); + } + + // Whole-map projection (note, fully projected maps are + // identical to projected simple columns at this level of + // abstraction.) + + { + List<SchemaPath> projCols = new ArrayList<>(); + projCols.add(SchemaPath.getSimplePath("map")); + ProjectionSet projSet = ProjectionSetImpl.parse(projCols); + assertTrue(projSet instanceof ProjectionSetImpl); + assertTrue(projSet.isProjected("map")); + assertFalse(projSet.isProjected("another")); + ProjectionSet mapProj = projSet.mapProjection("map"); + assertNotNull(mapProj); + assertTrue(mapProj instanceof NullProjectionSet); + assertTrue(mapProj.isProjected("foo")); + assertNotNull(projSet.mapProjection("another")); + assertFalse(projSet.mapProjection("another").isProjected("anyCol")); + } + + // Selected map projection, multiple levels, full projection + // at leaf level. + + { + List<SchemaPath> projCols = new ArrayList<>(); + projCols.add(SchemaPath.getCompoundPath("map", "a")); + projCols.add(SchemaPath.getCompoundPath("map", "b")); + projCols.add(SchemaPath.getCompoundPath("map", "map2", "x")); + ProjectionSet projSet = ProjectionSetImpl.parse(projCols); + assertTrue(projSet instanceof ProjectionSetImpl); + assertTrue(projSet.isProjected("map")); + + // Map: an explicit map at top level + + ProjectionSet mapProj = projSet.mapProjection("map"); + assertTrue(mapProj instanceof ProjectionSetImpl); + assertTrue(mapProj.isProjected("a")); + assertTrue(mapProj.isProjected("b")); + assertTrue(mapProj.isProjected("map2")); + assertFalse(projSet.isProjected("bogus")); + + // Map b: an implied nested map + + ProjectionSet bMapProj = mapProj.mapProjection("b"); + assertNotNull(bMapProj); + assertTrue(bMapProj instanceof NullProjectionSet); + assertTrue(bMapProj.isProjected("foo")); + + // Map2, an nested map, has an explicit projection + + ProjectionSet map2Proj = mapProj.mapProjection("map2"); + assertNotNull(map2Proj); + assertTrue(map2Proj instanceof ProjectionSetImpl); + assertTrue(map2Proj.isProjected("x")); + assertFalse(map2Proj.isProjected("bogus")); + } + } + + /** + * Test imposing a selection mask between the client and the underlying + * vector container. + */ + + @Test + public void testProjectionStatic() { + List<SchemaPath> selection = Lists.newArrayList( + SchemaPath.getSimplePath("c"), + SchemaPath.getSimplePath("b"), + SchemaPath.getSimplePath("e")); + TupleMetadata schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.INT) + .add("d", MinorType.INT) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setProjection(selection) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + + doProjectionTest(rsLoader); + } + + @Test + public void testProjectionDynamic() { + List<SchemaPath> selection = Lists.newArrayList( + SchemaPath.getSimplePath("c"), + SchemaPath.getSimplePath("b"), + SchemaPath.getSimplePath("e")); + ResultSetOptions options = new OptionBuilder() + .setProjection(selection) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + rootWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED)); + rootWriter.addColumn(SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.REQUIRED)); + rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED)); + rootWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.REQUIRED)); + + doProjectionTest(rsLoader); + } + + private void doProjectionTest(ResultSetLoader rsLoader) { + RowSetLoader rootWriter = rsLoader.writer(); + + // All columns appear, including non-projected ones. + + TupleMetadata actualSchema = rootWriter.schema(); + assertEquals(4, actualSchema.size()); + assertEquals("a", actualSchema.column(0).getName()); + assertEquals("b", actualSchema.column(1).getName()); + assertEquals("c", actualSchema.column(2).getName()); + assertEquals("d", actualSchema.column(3).getName()); + assertEquals(0, actualSchema.index("A")); + assertEquals(3, actualSchema.index("d")); + assertEquals(-1, actualSchema.index("e")); + + // Non-projected columns identify themselves via metadata + + assertFalse(actualSchema.metadata("a").isProjected()); + assertTrue(actualSchema.metadata("b").isProjected()); + assertTrue(actualSchema.metadata("c").isProjected()); + assertFalse(actualSchema.metadata("d").isProjected()); + + // Write some data. Doesn't need much. + + rsLoader.startBatch(); + for (int i = 1; i < 3; i++) { + rootWriter.start(); + rootWriter.scalar(0).setInt(i * 5); + rootWriter.scalar(1).setInt(i); + rootWriter.scalar(2).setInt(i * 10); + rootWriter.scalar(3).setInt(i * 20); + rootWriter.save(); + } + + // Verify. Result should only have the projected + // columns, only if defined by the loader, in the order + // of definition. + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.INT) + .add("c", MinorType.INT) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(1, 10) + .addRow(2, 20) + .build(); + RowSet actual = fixture.wrap(rsLoader.harvest()); +// actual.print(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + rsLoader.close(); + } + + @Test + public void testMapProjection() { + List<SchemaPath> selection = Lists.newArrayList( + SchemaPath.getSimplePath("m1"), + SchemaPath.getCompoundPath("m2", "d")); + TupleMetadata schema = new SchemaBuilder() + .addMap("m1") + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .buildMap() + .addMap("m2") + .add("c", MinorType.INT) + .add("d", MinorType.INT) + .buildMap() + .addMap("m3") + .add("e", MinorType.INT) + .add("f", MinorType.INT) + .buildMap() + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setProjection(selection) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Verify the projected columns + + TupleMetadata actualSchema = rootWriter.schema(); + ColumnMetadata m1Md = actualSchema.metadata("m1"); + assertTrue(m1Md.isMap()); + assertTrue(m1Md.isProjected()); + assertEquals(2, m1Md.mapSchema().size()); + assertTrue(m1Md.mapSchema().metadata("a").isProjected()); + assertTrue(m1Md.mapSchema().metadata("b").isProjected()); + + ColumnMetadata m2Md = actualSchema.metadata("m2"); + assertTrue(m2Md.isMap()); + assertTrue(m2Md.isProjected()); + assertEquals(2, m2Md.mapSchema().size()); + assertFalse(m2Md.mapSchema().metadata("c").isProjected()); + assertTrue(m2Md.mapSchema().metadata("d").isProjected()); + + ColumnMetadata m3Md = actualSchema.metadata("m3"); + assertTrue(m3Md.isMap()); + assertFalse(m3Md.isProjected()); + assertEquals(2, m3Md.mapSchema().size()); + assertFalse(m3Md.mapSchema().metadata("e").isProjected()); + assertFalse(m3Md.mapSchema().metadata("f").isProjected()); + + // Write a couple of rows. + + rsLoader.startBatch(); + rootWriter.start(); + rootWriter.tuple("m1").scalar("a").setInt(1); + rootWriter.tuple("m1").scalar("b").setInt(2); + rootWriter.tuple("m2").scalar("c").setInt(3); + rootWriter.tuple("m2").scalar("d").setInt(4); + rootWriter.tuple("m3").scalar("e").setInt(5); + rootWriter.tuple("m3").scalar("f").setInt(6); + rootWriter.save(); + + rootWriter.start(); + rootWriter.tuple("m1").scalar("a").setInt(11); + rootWriter.tuple("m1").scalar("b").setInt(12); + rootWriter.tuple("m2").scalar("c").setInt(13); + rootWriter.tuple("m2").scalar("d").setInt(14); + rootWriter.tuple("m3").scalar("e").setInt(15); + rootWriter.tuple("m3").scalar("f").setInt(16); + rootWriter.save(); + + // Verify. Only the projected columns appear in the result set. + + BatchSchema expectedSchema = new SchemaBuilder() + .addMap("m1") + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .buildMap() + .addMap("m2") + .add("d", MinorType.INT) + .buildMap() + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(new Object[] {1, 2}, new Object[] {4}) + .addRow(new Object[] {11, 12}, new Object[] {14}) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(rsLoader.harvest())); + rsLoader.close(); + } + + /** + * Test a map array. Use the convenience methods to set values. + * Only the projected array members should appear in the harvested + * results. + */ + + @Test + public void testMapArrayProjection() { + List<SchemaPath> selection = Lists.newArrayList( + SchemaPath.getSimplePath("m1"), + SchemaPath.getCompoundPath("m2", "d")); + TupleMetadata schema = new SchemaBuilder() + .addMapArray("m1") + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .buildMap() + .addMapArray("m2") + .add("c", MinorType.INT) + .add("d", MinorType.INT) + .buildMap() + .addMapArray("m3") + .add("e", MinorType.INT) + .add("f", MinorType.INT) + .buildMap() + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setProjection(selection) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + // Write a couple of rows. + + rsLoader.startBatch(); + rootWriter.addRow( + new Object[] { new Object[] {10, 20}, new Object[] {11, 21}}, + new Object[] { new Object[] {30, 40}, new Object[] {31, 42}}, + new Object[] { new Object[] {50, 60}, new Object[] {51, 62}}); + rootWriter.addRow( + new Object[] { new Object[] {110, 120}, new Object[] {111, 121}}, + new Object[] { new Object[] {130, 140}, new Object[] {131, 142}}, + new Object[] { new Object[] {150, 160}, new Object[] {151, 162}}); + + // Verify. Only the projected columns appear in the result set. + + BatchSchema expectedSchema = new SchemaBuilder() + .addMapArray("m1") + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .buildMap() + .addMapArray("m2") + .add("d", MinorType.INT) + .buildMap() + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow( + new Object[] { new Object[] {10, 20}, new Object[] {11, 21}}, + new Object[] { new Object[] {40}, new Object[] {42}}) + .addRow( + new Object[] { new Object[] {110, 120}, new Object[] {111, 121}}, + new Object[] { new Object[] {140}, new Object[] {142}}) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(rsLoader.harvest())); + rsLoader.close(); + } + + /** + * Verify that the projection code plays nice with vector overflow. Overflow + * is the most complex operation in this subsystem with many specialized + * methods that must work together flawlessly. This test ensures that + * non-projected columns stay in the background and don't interfere + * with overflow logic. + */ + + @Test + public void testProjectWithOverflow() { + List<SchemaPath> selection = Lists.newArrayList( + SchemaPath.getSimplePath("small"), + SchemaPath.getSimplePath("dummy")); + TupleMetadata schema = new SchemaBuilder() + .add("big", MinorType.VARCHAR) + .add("small", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setProjection(selection) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + byte big[] = new byte[600]; + Arrays.fill(big, (byte) 'X'); + byte small[] = new byte[512]; + Arrays.fill(small, (byte) 'X'); + + rsLoader.startBatch(); + int count = 0; + while (! rootWriter.isFull()) { + rootWriter.start(); + rootWriter.scalar(0).setBytes(big, big.length); + rootWriter.scalar(1).setBytes(small, small.length); + rootWriter.save(); + count++; + } + + // Number of rows should be driven by size of the + // projected vector ("small"), not by the larger, unprojected + // "big" vector. + // Our row count should include the overflow row + + int expectedCount = ValueVector.MAX_BUFFER_SIZE / small.length; + assertEquals(expectedCount + 1, count); + + // Loader's row count should include only "visible" rows + + assertEquals(expectedCount, rootWriter.rowCount()); + + // Total count should include invisible and look-ahead rows. + + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + result.clear(); + + rsLoader.close(); + } +}