This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 8087223 DRILL-7377: Nested schemas for dynamic EVF columns 8087223 is described below commit 8087223667c01f141f72e0383a0c4f42af99f5ac Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Sun Oct 6 22:09:44 2019 -0700 DRILL-7377: Nested schemas for dynamic EVF columns The Result Set Loader (part of EVF) allows adding columns up-front before reading rows (so-called "early schema.") Such schemas allow nested columns (maps with members, repeated lists with a type, etc.) The Result Set Loader also allows adding columns dynamically while loading data (so-called "late schema".) Previously, the code assumed that columns would be added top-down: first the map, then the map's contents, etc. Charles found a need to allow adding a nested column (a repeated list with a declared list type.) This patch revises the code to use the same mechanism in both the early- and late-schema cases, allowing adding nested columns at any time. Testing: Added a new unit test case for the repeated list late schema with content case. --- .../physical/resultSet/impl/BuildFromSchema.java | 94 +++++++++++++++++++--- .../physical/resultSet/impl/ColumnBuilder.java | 6 ++ .../resultSet/impl/ResultSetLoaderImpl.java | 2 +- .../exec/physical/resultSet/impl/TupleState.java | 15 +--- .../impl/TestResultSetLoaderRepeatedList.java | 73 ++++++++++++----- .../accessor/writer/AbstractTupleWriter.java | 26 ++++-- 6 files changed, 162 insertions(+), 54 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java index e0d9e27..bf1256d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java @@ -32,6 +32,21 @@ import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter; * <p> * Recursion is much easier if we can go bottom-up. But, writers * require top-down construction. + * <p> + * This particular class builds a column and all its contents. + * For example, given a map, which contains a repeated list which + * contains a repeated INT, this class first builds the map, + * then adds the repeated list, then adds the INT array. To do + * so, it will create a copy of the structured metadata. + * <p> + * A drawback of this approach is that the metadata objects used + * in the "parent" writers will be copies of, not the same as, those + * in the schema from which we are building the writers. At present, + * this is not an issue, but it is something to be aware of as uses + * become more sophisticated. + * <p> + * This class contrasts with the @{link ColumnBuilder} class which + * builds the structure within a single vector and writer. */ public class BuildFromSchema { @@ -49,6 +64,11 @@ public class BuildFromSchema { ObjectWriter add(ColumnMetadata colSchema); } + /** + * Shim used for adding a column to a tuple directly. + * This method will recursively invoke this builder + * to expand any nested content. + */ private static class TupleShim implements ParentShim { private final TupleWriter writer; @@ -63,6 +83,24 @@ public class BuildFromSchema { } } + /** + * Shim used when implementing the add of a column to + * a tuple in the result set loader. Directly calls the + * internal method to add a column to the "tuple state." + */ + private static class TupleStateShim implements ParentShim { + private final TupleState state; + + public TupleStateShim(TupleState state) { + this.state = state; + } + + @Override + public ObjectWriter add(ColumnMetadata colSchema) { + return state.addColumn(colSchema).writer(); + } + } + private static class UnionShim implements ParentShim { private final VariantWriter writer; @@ -89,6 +127,12 @@ public class BuildFromSchema { } } + private static BuildFromSchema instance = new BuildFromSchema(); + + private BuildFromSchema() { } + + public static BuildFromSchema instance() { return instance; } + /** * When creating a schema up front, provide the schema of the desired tuple, * then build vectors and writers to match. Allows up-front schema definition @@ -105,17 +149,36 @@ public class BuildFromSchema { } } - private void buildColumn(ParentShim parent, ColumnMetadata colSchema) { + /** + * Build a column recursively. Called internally when adding a column + * via the addColumn() method on the tuple writer. + */ + + public ObjectWriter buildColumn(TupleState state, ColumnMetadata colSchema) { + return buildColumn(new TupleStateShim(state), colSchema); + } + + /** + * Build the column writer, and any nested content, returning the built + * column writer as a generic object writer. + * + * @param parent the shim that implements the logic to add a column + * to a tuple, list, repeated list, or union. + * @param colSchema the schema of the column to add + * @return the object writer for the added column + */ + + private ObjectWriter buildColumn(ParentShim parent, ColumnMetadata colSchema) { if (colSchema.isMultiList()) { - buildRepeatedList(parent, colSchema); + return buildRepeatedList(parent, colSchema); } else if (colSchema.isMap()) { - buildMap(parent, colSchema); + return buildMap(parent, colSchema); } else if (isSingleList(colSchema)) { - buildSingleList(parent, colSchema); + return buildSingleList(parent, colSchema); } else if (colSchema.isVariant()) { - buildVariant(parent, colSchema); + return buildVariant(parent, colSchema); } else { - buildPrimitive(parent, colSchema); + return buildPrimitive(parent, colSchema); } } @@ -133,13 +196,14 @@ public class BuildFromSchema { return colSchema.isVariant() && colSchema.isArray() && colSchema.variantSchema().isSingleType(); } - private void buildPrimitive(ParentShim parent, ColumnMetadata colSchema) { - parent.add(colSchema); + private ObjectWriter buildPrimitive(ParentShim parent, ColumnMetadata colSchema) { + return parent.add(colSchema); } - private void buildMap(ParentShim parent, ColumnMetadata colSchema) { + private ObjectWriter buildMap(ParentShim parent, ColumnMetadata colSchema) { final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty()); expandMap(colWriter, colSchema); + return colWriter; } private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) { @@ -162,9 +226,10 @@ public class BuildFromSchema { * @param colSchema the schema of the variant (LIST or UNION) column */ - private void buildVariant(ParentShim parent, ColumnMetadata colSchema) { + private ObjectWriter buildVariant(ParentShim parent, ColumnMetadata colSchema) { final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty()); expandVariant(colWriter, colSchema); + return colWriter; } private void expandVariant(ObjectWriter colWriter, ColumnMetadata colSchema) { @@ -182,13 +247,14 @@ public class BuildFromSchema { } } - private void buildSingleList(ParentShim parent, ColumnMetadata colSchema) { + private ObjectWriter buildSingleList(ParentShim parent, ColumnMetadata colSchema) { final ColumnMetadata seed = colSchema.cloneEmpty(); final ColumnMetadata subtype = colSchema.variantSchema().listSubtype(); seed.variantSchema().addType(subtype.cloneEmpty()); seed.variantSchema().becomeSimple(); final ObjectWriter listWriter = parent.add(seed); expandColumn(listWriter, subtype); + return listWriter; } /** @@ -200,14 +266,16 @@ public class BuildFromSchema { * @param colSchema schema definition of the array */ - private void buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) { + private ObjectWriter buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) { final ColumnMetadata seed = colSchema.cloneEmpty(); - final RepeatedListWriter listWriter = (RepeatedListWriter) parent.add(seed).array(); + final ObjectWriter objWriter = parent.add(seed); + final RepeatedListWriter listWriter = (RepeatedListWriter) objWriter.array(); final ColumnMetadata elements = colSchema.childSchema(); if (elements != null) { final RepeatedListShim listShim = new RepeatedListShim(listWriter); buildColumn(listShim, elements); } + return objWriter; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java index 466214e..57b07fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java @@ -78,7 +78,13 @@ import org.apache.drill.exec.vector.complex.UnionVector; * projected (not in the list), then creates a dummy writer. Issues an error if * the column is projected, but the implied projection type is incompatible with * the actual type. (Such as trying to project an INT as x[0].) + * <p> + * This class builds the internal structure of a vector. If building a "container" + * vector (map, list, repeated list or union), this class expects the container + * to be added empty, then the members to be added one by one. See + * {@link BuildFromSchema} for the class that builds up a compound structure. */ + public class ColumnBuilder { /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java index 2cfdeec..4d70065 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java @@ -310,7 +310,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals { // won't be if known up front. logger.debug("Schema: " + options.schema.toString()); - new BuildFromSchema().buildTuple(rootWriter, options.schema); + BuildFromSchema.instance().buildTuple(rootWriter, options.schema); } // If we want to project nothing, then we do, in fact, have a diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java index 0c6c6af..9d8f798 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.resultSet.ProjectionSet; import org.apache.drill.exec.physical.resultSet.ResultVectorCache; import org.apache.drill.exec.physical.resultSet.impl.ColumnState.BaseContainerColumnState; @@ -490,19 +489,7 @@ public abstract class TupleState extends ContainerState @Override public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) { - - // Verify name is not a (possibly case insensitive) duplicate. - - final TupleMetadata tupleSchema = schema(); - final String colName = columnSchema.name(); - if (tupleSchema.column(colName) != null) { - throw UserException - .validationError() - .message("Duplicate column name: ", colName) - .build(ResultSetLoaderImpl.logger); - } - - return addColumn(columnSchema).writer(); + return BuildFromSchema.instance().buildColumn(this, columnSchema); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java index 880b2d8..2010cc3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java @@ -17,22 +17,17 @@ */ package org.apache.drill.exec.physical.resultSet.impl; -import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; -import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray; -import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.util.Arrays; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.Types; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet; +import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType; @@ -48,14 +43,19 @@ import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.ValueType; import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter; +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.test.SubOperatorTest; -import org.apache.drill.exec.physical.rowSet.RowSet; -import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.apache.drill.test.rowSet.RowSetUtilities; -import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +import static org.apache.drill.test.rowSet.RowSetUtilities.objArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray; +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests repeated list support. Repeated lists add another layer of dimensionality @@ -173,6 +173,9 @@ public class TestResultSetLoaderRepeatedList extends SubOperatorTest { public void test2DLateSchemaIncremental() { final TupleMetadata schema = new SchemaBuilder() .add("id", MinorType.INT) + .addRepeatedList("list1") + .addArray(MinorType.VARCHAR) + .resumeSchema() .addRepeatedList("list2") .addArray(MinorType.VARCHAR) .resumeSchema() @@ -199,7 +202,7 @@ public class TestResultSetLoaderRepeatedList extends SubOperatorTest { // Sanity check of writer structure assertEquals(2, writer.size()); - final ObjectWriter listObj = writer.column("list2"); + final ObjectWriter listObj = writer.column("list1"); assertEquals(ObjectType.ARRAY, listObj.type()); final ArrayWriter listWriter = listObj.array(); @@ -221,7 +224,7 @@ public class TestResultSetLoaderRepeatedList extends SubOperatorTest { // Define the inner type. final RepeatedListWriter listWriterImpl = (RepeatedListWriter) listWriter; - listWriterImpl.defineElement(MaterializedField.create("list2", Types.repeated(MinorType.VARCHAR))); + listWriterImpl.defineElement(MaterializedField.create("list1", Types.repeated(MinorType.VARCHAR))); // Sanity check of completed structure @@ -234,17 +237,45 @@ public class TestResultSetLoaderRepeatedList extends SubOperatorTest { // Write values writer - .addRow(5, objArray(strArray("a", "b"), strArray("c", "d"))); + .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1"))); + + // Add the second list, with a complete type + + writer.addColumn(schema.metadata(2)); + + // Sanity check of writer structure + + assertEquals(3, writer.size()); + final ObjectWriter list2Obj = writer.column("list2"); + assertEquals(ObjectType.ARRAY, list2Obj.type()); + final ArrayWriter list2Writer = list2Obj.array(); + assertEquals(ObjectType.ARRAY, list2Writer.entryType()); + final ArrayWriter inner2Writer = list2Writer.array(); + assertEquals(ObjectType.SCALAR, inner2Writer.entryType()); + final ScalarWriter str2Writer = inner2Writer.scalar(); + assertEquals(ValueType.STRING, str2Writer.valueType()); + + // Write values + + writer + .addRow(6, + objArray(strArray("a2", "b2"), strArray("c2", "d2")), + objArray(strArray("w2", "x2"), strArray("y2", "z2"))); + + // Add the second list, with a complete type // Verify the values. // (Relies on the row set level repeated list tests having passed.) final RowSet expected = fixture.rowSetBuilder(schema) - .addRow(1, objArray()) - .addRow(2, objArray()) - .addRow(3, objArray()) - .addRow(4, objArray(objArray(), null)) - .addRow(5, objArray(strArray("a", "b"), strArray("c", "d"))) + .addRow(1, objArray(), objArray()) + .addRow(2, objArray(), objArray()) + .addRow(3, objArray(), objArray()) + .addRow(4, objArray(objArray(), null), objArray()) + .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1")), objArray()) + .addRow(6, + objArray(strArray("a2", "b2"), strArray("c2", "d2")), + objArray(strArray("w2", "x2"), strArray("y2", "z2"))) .build(); RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest())); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java index 659d9da..948b8c7 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.vector.accessor.writer; import java.util.ArrayList; import java.util.List; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.TupleMetadata; @@ -32,6 +33,8 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.TupleWriter; import org.apache.drill.exec.vector.accessor.VariantWriter; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation for a writer for a tuple (a row or a map.) Provides access to each @@ -143,6 +146,8 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { ObjectWriter addColumn(TupleWriter tuple, MaterializedField field); } + protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class); + protected final TupleMetadata tupleSchema; protected final List<AbstractObjectWriter> writers; protected ColumnWriterIndex vectorIndex; @@ -205,20 +210,29 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { @Override public int addColumn(ColumnMetadata column) { - if (listener == null) { - throw new UnsupportedOperationException("addColumn"); - } + verifyAddColumn(column.name()); final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column); return addColumnWriter(colWriter); } @Override public int addColumn(MaterializedField field) { + verifyAddColumn(field.getName()); + final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field); + return addColumnWriter(colWriter); + } + + private void verifyAddColumn(String colName) { if (listener == null) { throw new UnsupportedOperationException("addColumn"); } - final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field); - return addColumnWriter(colWriter); + + if (tupleSchema().column(colName) != null) { + throw UserException + .validationError() + .message("Duplicate column name: ", colName) + .build(logger); + } } @Override @@ -429,6 +443,8 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { this.listener = listener; } + public TupleWriterListener listener() { return listener; } + @Override public void bindListener(ColumnWriterListener listener) { }