This is an automated email from the ASF dual-hosted git repository. progers pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit d96991aecf927f0249f1db438d3ae01e9100ba98 Author: Paul Rogers <par0...@gmail.com> AuthorDate: Sun Jul 17 21:27:11 2022 -0700 Unprojected union works --- .../physical/resultSet/impl/ColumnBuilder.java | 5 +- .../exec/physical/resultSet/impl/UnionState.java | 25 ++++-- .../impl/TestResultSetLoaderUnprojected.java | 11 ++- .../accessor/writer/ColumnWriterFactory.java | 18 ++-- .../accessor/writer/DummyUnionVectorShim.java | 98 ---------------------- .../exec/vector/accessor/writer/EmptyListShim.java | 5 +- .../vector/accessor/writer/UnionMemberShim.java | 59 +++++++++++++ .../exec/vector/accessor/writer/UnionShim.java | 1 - .../vector/accessor/writer/UnionVectorShim.java | 63 +++++++++++--- .../vector/accessor/writer/UnionWriterImpl.java | 27 +++++- 10 files changed, 176 insertions(+), 136 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java index 04fe811b71..af32d15e3e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java @@ -320,10 +320,8 @@ public class ColumnBuilder { // vectors can be cached. assert columnSchema.variantSchema().size() == 0; final UnionVector vector; - final UnionWriterImpl unionWriter; if (parent.projection().projection(columnSchema).isProjected || allowCreation(parent)) { vector = new UnionVector(columnSchema.schema(), parent.loader().allocator(), null); - unionWriter = new UnionWriterImpl(columnSchema, vector, null); } else { vector = null; } @@ -337,7 +335,8 @@ public class ColumnBuilder { // Create the manager for the columns within the union. final UnionState unionState = new UnionState(parent.loader(), - parent.vectorCache().childCache(columnSchema.name())); + parent.vectorCache().childCache(columnSchema.name()), + vector == null ? ProjectionFilter.PROJECT_NONE : ProjectionFilter.PROJECT_ALL); // Bind the union state to the union writer to handle column additions. unionWriter.bindListener(unionState); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java index 02cc90c0d9..df214770f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/UnionState.java @@ -95,12 +95,16 @@ public class UnionState extends ContainerState public static class UnionVectorState implements VectorState { private final UnionVector vector; - private final SimpleVectorState typesVectorState; + private final VectorState typesVectorState; public UnionVectorState(UnionVector vector, UnionWriterImpl unionWriter) { this.vector = vector; - typesVectorState = new FixedWidthVectorState( - ((UnionVectorShim) unionWriter.shim()).typeWriter(), vector.getTypeVector()); + if (vector == null) { + typesVectorState = new NullVectorState(); + } else { + typesVectorState = new FixedWidthVectorState( + ((UnionVectorShim) unionWriter.shim()).typeWriter(), vector.getTypeVector()); + } } @Override @@ -133,7 +137,7 @@ public class UnionState extends ContainerState public UnionVector vector() { return vector; } @Override - public boolean isProjected() { return true; } + public boolean isProjected() { return vector != null; } @Override public void dump(HierarchicalFormatter format) { @@ -147,11 +151,10 @@ public class UnionState extends ContainerState * vectors in the union, * and matches the set of child writers in the union writer. */ - private final Map<MinorType, ColumnState> columns = new HashMap<>(); - public UnionState(LoaderInternals events, ResultVectorCache vectorCache) { - super(events, vectorCache); + public UnionState(LoaderInternals events, ResultVectorCache vectorCache, ProjectionFilter projectionSet) { + super(events, vectorCache, projectionSet); } public UnionWriterImpl writer() { @@ -183,7 +186,13 @@ public class UnionState extends ContainerState protected void addColumn(ColumnState colState) { assert ! columns.containsKey(colState.schema().type()); columns.put(colState.schema().type(), colState); - vector().addType(colState.vector()); + if (vector() == null) { + if (colState.vector() != null) { + throw new IllegalStateException("Attempt to add a materialized vector to an unprojected vector"); + } + } else { + vector().addType(colState.vector()); + } } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java index d0415b615c..50271cab8b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderUnprojected.java @@ -15,6 +15,7 @@ import org.apache.drill.exec.physical.resultSet.project.Projections; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetTestUtils; import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet; +import org.apache.drill.exec.physical.rowSet.RowSetFormatter; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ArrayWriter; @@ -292,12 +293,12 @@ public class TestResultSetLoaderUnprojected extends SubOperatorTest { assertFalse(rootWriter.column("b").isProjected()); rsLoader.startBatch(); rootWriter.start(); - rootWriter.scalar(0).setInt(1); - rootWriter.scalar(1).setInt(5); + rootWriter.variant(0).scalar(MinorType.INT).setInt(1); + rootWriter.variant(1).scalar(MinorType.INT).setInt(5); rootWriter.save(); rootWriter.start(); - rootWriter.scalar(0).setString("2"); - rootWriter.scalar(1).setString("10"); + rootWriter.variant(0).scalar(MinorType.VARCHAR).setString("2"); + rootWriter.variant(1).scalar(MinorType.VARCHAR).setString("10"); rootWriter.save(); TupleMetadata expectedSchema = new SchemaBuilder() @@ -311,6 +312,8 @@ public class TestResultSetLoaderUnprojected extends SubOperatorTest { .addRow("2") .build(); RowSet actual = fixture.wrap(rsLoader.harvest()); + RowSetFormatter.print(actual); + RowSetFormatter.print(expected); RowSetUtilities.verify(expected, actual); rsLoader.close(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java index 2b22efc60b..f610473685 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java @@ -125,15 +125,15 @@ public class ColumnWriterFactory { final ScalarObjectWriter scalarWriter = new ScalarObjectWriter( new DummyScalarWriter(schema)); switch (schema.mode()) { - case OPTIONAL: - case REQUIRED: - return scalarWriter; - case REPEATED: - return new ArrayObjectWriter( - new DummyArrayWriter(schema, - scalarWriter)); - default: - throw new UnsupportedOperationException(schema.mode().toString()); + case OPTIONAL: + case REQUIRED: + return scalarWriter; + case REPEATED: + return new ArrayObjectWriter( + new DummyArrayWriter(schema, + scalarWriter)); + default: + throw new UnsupportedOperationException(schema.mode().toString()); } } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DummyUnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DummyUnionVectorShim.java deleted file mode 100644 index b75b938c25..0000000000 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DummyUnionVectorShim.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.apache.drill.exec.vector.accessor.writer; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; -import org.apache.drill.exec.vector.accessor.ObjectWriter; -import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; - -/** - * Shim for a non-existent (unprojected) union vector. - */ -public class DummyUnionVectorShim implements UnionShim { - - @Override - public void bindIndex(ColumnWriterIndex index) { } - - @Override - public void bindListener(ColumnWriterListener listener) { } - - @Override - public void startWrite() { } - - @Override - public void startRow() { } - - @Override - public void endArrayValue() { } - - @Override - public void restartRow() { } - - @Override - public void saveRow() { } - - @Override - public void endWrite() { } - - @Override - public void preRollover() { } - - @Override - public void postRollover() { } - - @Override - public void dump(HierarchicalFormatter format) { } - - @Override - public int writeIndex() { return 0; } - - @Override - public void bindWriter(UnionWriterImpl writer) { } - - @Override - public void setNull() { } - - @Override - public boolean hasType(MinorType type) { - // TODO Auto-generated method stub - return false; - } - - @Override - public ObjectWriter member(MinorType type) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setType(MinorType type) { - // TODO Auto-generated method stub - } - - @Override - public int lastWriteIndex() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int rowStartIndex() { return 0; } - - @Override - public AbstractObjectWriter addMember(ColumnMetadata colSchema) { - // TODO Auto-generated method stub - return null; - } - - @Override - public AbstractObjectWriter addMember(MinorType type) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void addMember(AbstractObjectWriter colWriter) { - // TODO Auto-generated method stub - } -} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java index f3e2590dc5..064b63a442 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java @@ -32,7 +32,6 @@ import com.google.common.base.Preconditions; * the case that a list may eventually hold a union, but at present * it holds nothing. */ - public class EmptyListShim implements UnionShim { private UnionWriterImpl writer; @@ -116,7 +115,6 @@ public class EmptyListShim implements UnionShim { * @param colWriter the column writer returned from the listener * @return the same column writer */ - private AbstractObjectWriter doAddMember(AbstractObjectWriter colWriter) { // Something went terribly wrong if the check below fails. Preconditions.checkState(writer.shim() != this); @@ -146,4 +144,7 @@ public class EmptyListShim implements UnionShim { public void dump(HierarchicalFormatter format) { format.startObject(this).endObject(); } + + @Override + public boolean isProjected() { return true; } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java new file mode 100644 index 0000000000..0cea513567 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionMemberShim.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor.writer; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.UnionVector; + +/** + * Retrieves the member vectors for a union vector. + */ +interface UnionMemberShim { + + ValueVector getMember(MinorType type); + boolean isProjected(); + + class UnionMemberShimImpl implements UnionMemberShim { + private final UnionVector vector; + + protected UnionMemberShimImpl(UnionVector vector) { + this.vector = vector; + } + + @Override + public ValueVector getMember(MinorType type) { + return vector.getMember(type); + } + + @Override + public boolean isProjected() { return true; } + } + + class DummyUnionMemberShim implements UnionMemberShim { + + @Override + public ValueVector getMember(MinorType type) { + return null; + } + + @Override + public boolean isProjected() { return false; } + } + +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java index 4bfafa9ae4..f21bf18e52 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionShim.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.vector.accessor.writer; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ObjectWriter; -import org.apache.drill.exec.vector.complex.UnionVector; /** * Unions are overly complex. They can evolve from no type, to a single type, diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java index c9133d207b..7c5b6271d1 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java @@ -17,15 +17,20 @@ */ package org.apache.drill.exec.vector.accessor.writer; +import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.AbstractFixedWidthWriter.BaseFixedWidthWriter; -import org.apache.drill.exec.vector.accessor.writer.UnionShim.AbstractUnionShim; +import org.apache.drill.exec.vector.accessor.writer.UnionMemberShim.DummyUnionMemberShim; +import org.apache.drill.exec.vector.accessor.writer.UnionMemberShim.UnionMemberShimImpl; +import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim; +import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter; import org.apache.drill.exec.vector.complex.UnionVector; /** @@ -37,7 +42,7 @@ import org.apache.drill.exec.vector.complex.UnionVector; * list itself evolves from no type, to a single type and to * a union. */ -public class UnionVectorShim extends AbstractUnionShim { +public class UnionVectorShim implements UnionShim { static class DefaultListener implements VariantWriterListener { @@ -55,7 +60,10 @@ public class UnionVectorShim extends AbstractUnionShim { // which will create the member metadata. This means that the type // will already be in the variant schema by the time we add the // writer to the variant writer in a few steps from now. - final ValueVector memberVector = shim.vector.getMember(type); + // + // When a variant is unprojected, the member vector will be null, + // which will cause a dummy writer to be created. + final ValueVector memberVector = shim.unionMemberShim.getMember(type); final ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type); return ColumnWriterFactory.buildColumnWriter(memberSchema, memberVector); } @@ -66,7 +74,8 @@ public class UnionVectorShim extends AbstractUnionShim { } } - private final UnionVector vector; + private final UnionMemberShim unionMemberShim; + private final AbstractObjectWriter variants[]; private UnionWriterImpl writer; /** @@ -74,18 +83,28 @@ public class UnionVectorShim extends AbstractUnionShim { * says which union member holds the value for each row. The type vector * can also indicate if the value is null. */ - private final BaseScalarWriter typeWriter; + private final AbstractScalarWriterImpl typeWriter; public UnionVectorShim(UnionVector vector) { - this.vector = vector; + this.unionMemberShim = vector == null ? new DummyUnionMemberShim() : new UnionMemberShimImpl(vector); typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector()); + variants = new AbstractObjectWriter[MinorType.values().length]; } public UnionVectorShim(UnionVector vector, AbstractObjectWriter variants[]) { - super(variants); - this.vector = vector; - typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector()); + if (vector == null) { + this.unionMemberShim = new DummyUnionMemberShim(); + this.typeWriter = new DummyScalarWriter(new PrimitiveColumnMetadata("$type", MinorType.UINT1, DataMode.REQUIRED)); + } else { + this.unionMemberShim = new UnionMemberShimImpl(vector); + this.typeWriter = ColumnWriterFactory.newWriter(vector.getTypeVector()); + } + if (variants == null) { + this.variants = new AbstractObjectWriter[MinorType.values().length]; + } else { + this.variants = variants; + } } @Override @@ -121,6 +140,11 @@ public class UnionVectorShim extends AbstractUnionShim { typeWriter.setInt(UnionVector.NULL_MARKER); } + @Override + public boolean hasType(MinorType type) { + return variants[type.ordinal()] != null; + } + @Override public ObjectWriter member(MinorType type) { final AbstractObjectWriter colWriter = variants[type.ordinal()]; @@ -165,6 +189,23 @@ public class UnionVectorShim extends AbstractUnionShim { writer.addMember(colWriter); } + /** + * Performs just the work of adding a vector to the list of existing + * variants. Called when adding a type via the writer, but also when + * the result set loader promotes a list from single type to a union, + * and provides this shim with the writer from the single-list shim. + * In the latter case, the writer is already initialized and is already + * part of the metadata for this list; so we don't want to call the + * list's {@code addMember()} and repeat those operations. + * + * @param colWriter the column (type) writer to add + */ + public void addMemberWriter(AbstractObjectWriter colWriter) { + final MinorType type = colWriter.schema().type(); + assert variants[type.ordinal()] == null; + variants[type.ordinal()] = colWriter; + } + @Override public void startWrite() { typeWriter.startWrite(); @@ -245,7 +286,6 @@ public class UnionVectorShim extends AbstractUnionShim { } } - /** * Return the writer for the types vector. To be used only by the row set * loader overflow logic; never by the application (which is why the method @@ -280,4 +320,7 @@ public class UnionVectorShim extends AbstractUnionShim { typeWriter.dump(format); format.endObject(); } + + @Override + public boolean isProjected() { return unionMemberShim.isProjected(); } } \ No newline at end of file diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java index 7b5ef17804..33642cc770 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java @@ -55,6 +55,31 @@ import org.joda.time.Period; */ public class UnionWriterImpl implements VariantWriter, WriterEvents { + public interface UnionShim extends WriterEvents { + void bindWriter(UnionWriterImpl writer); + void setNull(); + boolean hasType(MinorType type); + + /** + * Return an existing writer for the given type, or create a new one + * if needed. + * + * @param type desired variant type + * @return a writer for that type + */ + + ObjectWriter member(MinorType type); + void setType(MinorType type); + @Override + int lastWriteIndex(); + @Override + int rowStartIndex(); + AbstractObjectWriter addMember(ColumnMetadata colSchema); + AbstractObjectWriter addMember(MinorType type); + void addMember(AbstractObjectWriter colWriter); + boolean isProjected(); + } + public static class VariantObjectWriter extends AbstractObjectWriter { private final UnionWriterImpl writer; @@ -256,7 +281,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { } @Override - public boolean isProjected() { return true; } + public boolean isProjected() { return shim.isProjected(); } @Override public void startWrite() {