This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 2c1a8a0 DRILL-7257: Set nullable var-width vector lastSet value 2c1a8a0 is described below commit 2c1a8a0afa3a1d8853e7651f1b3d47d52969f50a Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Thu May 16 14:54:39 2019 -0700 DRILL-7257: Set nullable var-width vector lastSet value Turns out this is due to a subtle issue with variable-width nullable vectors. Such vectors have a lastSet attribute in the Mutator class. When using "transfer pairs" to copy values, the code somehow decides to zero-fill from the lastSet value to the record count. The row set framework did not set this value, meaning that the RemovingRecordBatch zero-filled the dir0 column when it chose to use transfer pairs rather than copying values. The use of transfer pairs occurs when all rows in a batch pass the filter prior to the removing record batch. Modified the nullable vector writer to properly set the lastSet value at the end of each batch. Added a unit test to verify the value is set correctly. Includes a bit of code clean-up. --- .../physical/impl/svremover/AbstractSV2Copier.java | 19 +++++++--------- .../physical/impl/svremover/GenericSV2Copier.java | 8 +++---- .../apache/drill/exec/record/VectorAccessible.java | 2 +- .../apache/drill/exec/record/VectorContainer.java | 5 ++--- .../exec/record/selection/SelectionVector2.java | 10 ++++----- .../test/rowSet/test/TestScalarAccessors.java | 13 +++++++---- .../codegen/templates/NullableValueVectors.java | 25 ++++++++++++---------- .../apache/drill/exec/vector/NullableVector.java | 1 + .../accessor/writer/NullableScalarWriter.java | 3 +++ 9 files changed, 46 insertions(+), 40 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java index d273fd3..ed33703 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.physical.impl.svremover; +import java.util.ArrayList; +import java.util.List; + import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; @@ -24,9 +27,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; -import java.util.ArrayList; -import java.util.List; - public abstract class AbstractSV2Copier extends AbstractCopier { protected ValueVector[] vvIn; private SelectionVector2 sv2; @@ -35,21 +35,18 @@ public abstract class AbstractSV2Copier extends AbstractCopier { @Override public void setup(VectorAccessible incoming, VectorContainer outgoing) { super.setup(incoming, outgoing); - this.sv2 = incoming.getSelectionVector2(); + sv2 = incoming.getSelectionVector2(); final int count = outgoing.getNumberOfColumns(); vvIn = new ValueVector[count]; - { - int index = 0; - - for (VectorWrapper vectorWrapper: incoming) { - vvIn[index] = vectorWrapper.getValueVector(); - index++; - } + int index = 0; + for (VectorWrapper<?> vectorWrapper: incoming) { + vvIn[index++] = vectorWrapper.getValueVector(); } } + @Override public void copyEntryIndirect(int inIndex, int outIndex) { copyEntry(sv2.getIndex(inIndex), outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java index f607e8c..d2f8dac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.impl.svremover; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.SchemaChangeCallBack; @@ -28,15 +27,14 @@ public class GenericSV2Copier extends AbstractSV2Copier { public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer, SchemaChangeCallBack callBack) { for(VectorWrapper<?> vv : incomingBatch){ - TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack)); - transferPairs.add(pair); + transferPairs.add(vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack))); } } @Override public void copyEntry(int inIndex, int outIndex) { - for ( int i = 0; i < vvIn.length; i++ ) { + for (int i = 0; i < vvIn.length; i++) { vvOut[i].copyEntry(outIndex, vvIn[i], inIndex); } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java index 63dab62..f51f521 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -23,7 +23,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; // TODO javadoc public interface VectorAccessible extends Iterable<VectorWrapper<?>> { - // TODO are these <?> releated in any way? Should they be the same one? + // TODO are these <?> related in any way? Should they be the same one? // TODO javadoc VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 05d9510..4a90184 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; - import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; @@ -376,8 +375,8 @@ public class VectorContainer implements VectorAccessible { } public void setRecordCount(int recordCount) { - this.recordCount = recordCount; - initialized = true; + this.recordCount = recordCount; + initialized = true; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 8afc5fb..859d492 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -17,12 +17,12 @@ */ package org.apache.drill.exec.record.selection; -import io.netty.buffer.DrillBuf; - -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.DeadBuf; +import io.netty.buffer.DrillBuf; + /** * A selection vector that fronts, at most, 64K values. * The selection vector is used for two cases: @@ -158,12 +158,12 @@ public class SelectionVector2 implements AutoCloseable { } public void setRecordCount(int recordCount){ -// logger.debug("Seting record count to {}", recordCount); +// logger.debug("Setting record count to {}", recordCount); this.recordCount = recordCount; } public boolean canDoFullTransfer() { - return (recordCount == batchActualRecordCount); + return recordCount == batchActualRecordCount; } public void setBatchActualRecordCount(int actualRecordCount) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java index 56c6af2..582c2f4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java @@ -17,12 +17,12 @@ */ package org.apache.drill.test.rowSet.test; +import static org.apache.drill.test.rowSet.RowSetUtilities.dec; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.apache.drill.test.rowSet.RowSetUtilities.dec; import java.math.BigDecimal; import java.util.Arrays; @@ -32,26 +32,28 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.SimpleVectorWrapper; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.DateUtilities; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ArrayReader; import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.ValueType; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetReader; import org.joda.time.DateTimeZone; import org.joda.time.Instant; import org.joda.time.LocalDate; import org.joda.time.LocalTime; import org.joda.time.Period; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetBuilder; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** * Verify that simple scalar (non-repeated) column readers @@ -678,6 +680,9 @@ public class TestScalarAccessors extends SubOperatorTest { .addRow("abcd") .build(); assertEquals(3, rs.rowCount()); + SimpleVectorWrapper<?> vw = (SimpleVectorWrapper<?>) rs.container().getValueVector(0); + NullableVarCharVector v = (NullableVarCharVector) vw.getValueVector(); + assertEquals(3, v.getMutator().getLastSet()); RowSetReader reader = rs.reader(); ScalarReader colReader = reader.scalar(0); diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index f82c718..60b3ec2 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -15,17 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.exec.memory.AllocationManager.BufferLedger; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.util.DecimalUtility; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.NullableVectorDefinitionSetter; - -import java.lang.Override; -import java.lang.UnsupportedOperationException; -import java.util.Set; - <@pp.dropOutputFile /> <#list vv.types as type> <#list type.minor as minor> @@ -41,6 +30,7 @@ import java.util.Set; package org.apache.drill.exec.vector; <#include "/@includes/vv_imports.ftl" /> +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; /** * Nullable${minor.class} implements a vector of values which could be null. Elements in the vector @@ -457,6 +447,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type mutator.exchange(other.getMutator()); } + @Override + public void finalizeLastSet(int count) { + <#if type.major = "VarLen"> + mutator.lastSet = count; + </#if> + } + <#if type.major != "VarLen"> @Override public void toNullable(ValueVector nullableVector) { @@ -756,6 +753,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type values.getMutator().setValueCount(valueCount); bits.getMutator().setValueCount(valueCount); } + <#if type.major == "VarLen"> /** Enables this wrapper container class to participate in bulk mutator logic */ private final class VarLenBulkInputCallbackImpl implements VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> { @@ -849,6 +847,11 @@ public final class ${className} extends BaseDataValueVector implements <#if type <#if type.major = "VarLen">lastSet = -1;</#if> } + <#if type.major = "VarLen"> + @VisibleForTesting + public int getLastSet() { return lastSet; } + + </#if> // For nullable vectors, exchanging buffers (done elsewhere) // requires also exchanging mutator state (done here.) diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java index 80b732a..eed998a 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java @@ -27,4 +27,5 @@ public interface NullableVector extends ValueVector { ValueVector getBitsVector(); ValueVector getValuesVector(); + void finalizeLastSet(int count); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index b3c2ea5..be3a3e4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -70,12 +70,14 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl { } } + private final NullableVector nullableVector; private final UInt1ColumnWriter isSetWriter; private final BaseScalarWriter baseWriter; private ColumnWriterIndex writerIndex; public NullableScalarWriter(ColumnMetadata schema, NullableVector nullableVector, BaseScalarWriter baseWriter) { this.schema = schema; + this.nullableVector = nullableVector; isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector()); this.baseWriter = baseWriter; } @@ -273,6 +275,7 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl { // Avoid back-filling null values. baseWriter.skipNulls(); baseWriter.endWrite(); + nullableVector.finalizeLastSet(writerIndex.vectorIndex()); } @Override