This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ad75bbd9a300771e2b1d40f29876c449e9113b7c
Author: Paul Rogers <par0...@yahoo.com>
AuthorDate: Sun Oct 20 14:09:26 2019 -0700

    DRILL-7414: EVF incorrectly sets buffer writer index after rollover
    
    Enabling the vector validator on the "new" scan operator, in cases
    in which overflow occurs, identified that the DrillBuf writer index
    was not properly set for repeated vectors.
    
    Enables such checking, adds unit tests, and fixes the writer index
    issue.
    
    closes #1878
---
 .../physical/impl/validate/BatchValidator.java     |  79 ++---
 .../resultSet/impl/RepeatedVectorState.java        |  10 +-
 .../physical/resultSet/impl/SingleVectorState.java |  14 +-
 .../drill/exec/physical/impl/MockRecordBatch.java  |  84 +++---
 .../impl/TestResultSetLoaderOverflow.java          | 334 ++++++++++++---------
 .../accessor/writer/OffsetVectorWriterImpl.java    |   7 +-
 6 files changed, 292 insertions(+), 236 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index dde6583..2753f55 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.validate;
 import java.util.IdentityHashMap;
 import java.util.Map;
 
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -153,13 +154,13 @@ public class BatchValidator {
    * unnecessary.
    */
   private static Map<Class<? extends RecordBatch>, CheckMode> buildRules() {
-    final Map<Class<? extends RecordBatch>, CheckMode> rules = new 
IdentityHashMap<>();
-    //rules.put(OperatorRecordBatch.class, CheckMode.ALL);
+    Map<Class<? extends RecordBatch>, CheckMode> rules = new 
IdentityHashMap<>();
+    rules.put(OperatorRecordBatch.class, CheckMode.ALL);
     return rules;
   }
 
   public static boolean validate(RecordBatch batch) {
-    final CheckMode checkMode = checkRules.get(batch.getClass());
+    CheckMode checkMode = checkRules.get(batch.getClass());
 
     // If no rule, don't check this batch.
 
@@ -173,10 +174,10 @@ public class BatchValidator {
 
     // All batches that do any checks will at least check counts.
 
-    final ErrorReporter reporter = errorReporter(batch);
-    final int rowCount = batch.getRecordCount();
+    ErrorReporter reporter = errorReporter(batch);
+    int rowCount = batch.getRecordCount();
     int valueCount = rowCount;
-    final VectorContainer container = batch.getContainer();
+    VectorContainer container = batch.getContainer();
     if (!container.hasRecordCount()) {
       reporter.error(String.format(
           "%s: Container record count not set",
@@ -185,11 +186,11 @@ public class BatchValidator {
       // Row count will = container count for most operators.
       // Row count <= container count for the filter operator.
 
-      final int containerRowCount = container.getRecordCount();
+      int containerRowCount = container.getRecordCount();
       valueCount = containerRowCount;
       switch (batch.getSchema().getSelectionVectorMode()) {
       case FOUR_BYTE:
-        final int sv4Count = batch.getSelectionVector4().getCount();
+        int sv4Count = batch.getSelectionVector4().getCount();
         if (sv4Count != rowCount) {
           reporter.error(String.format(
               "Mismatch between %s record count = %d, SV4 record count = %d",
@@ -199,7 +200,7 @@ public class BatchValidator {
         // TODO: Don't know how to check SV4 batches
         return true;
       case TWO_BYTE:
-        final int sv2Count = batch.getSelectionVector2().getCount();
+        int sv2Count = batch.getSelectionVector2().getCount();
         if (sv2Count != rowCount) {
           reporter.error(String.format(
               "Mismatch between %s record count = %d, SV2 record count = %d",
@@ -212,7 +213,7 @@ public class BatchValidator {
               batch.getClass().getSimpleName(),
               containerRowCount, sv2Count));
         }
-        final int svTotalCount = 
batch.getSelectionVector2().getBatchActualRecordCount();
+        int svTotalCount = 
batch.getSelectionVector2().getBatchActualRecordCount();
         if (svTotalCount != containerRowCount) {
           reporter.error(String.format(
               "Mismatch between %s container count = %d, SV2 total count = %d",
@@ -237,13 +238,13 @@ public class BatchValidator {
   }
 
   public static boolean validate(VectorAccessible batch) {
-    final ErrorReporter reporter = errorReporter(batch);
+    ErrorReporter reporter = errorReporter(batch);
     new BatchValidator(reporter).validateBatch(batch, batch.getRecordCount());
     return reporter.errorCount() == 0;
   }
 
   private static ErrorReporter errorReporter(VectorAccessible batch) {
-    final String opName = batch.getClass().getSimpleName();
+    String opName = batch.getClass().getSimpleName();
     if (LOG_TO_STDOUT) {
       return new StdOutReporter(opName);
     } else {
@@ -252,7 +253,7 @@ public class BatchValidator {
   }
 
   public void validateBatch(VectorAccessible batch, int rowCount) {
-    for (final VectorWrapper<? extends ValueVector> w : batch) {
+    for (VectorWrapper<? extends ValueVector> w : batch) {
       validateWrapper(rowCount, w);
     }
   }
@@ -264,7 +265,7 @@ public class BatchValidator {
   }
 
   private void validateVector(int expectedCount, ValueVector vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
     if (valueCount != expectedCount) {
       error(vector.getField().getName(), vector,
           String.format("Row count = %d, but value count = %d",
@@ -293,9 +294,9 @@ public class BatchValidator {
   }
 
   private void validateNullableVector(String name, NullableVector vector) {
-    final int outerCount = vector.getAccessor().getValueCount();
-    final ValueVector valuesVector = vector.getValuesVector();
-    final int valueCount = valuesVector.getAccessor().getValueCount();
+    int outerCount = vector.getAccessor().getValueCount();
+    ValueVector valuesVector = vector.getValuesVector();
+    int valueCount = valuesVector.getAccessor().getValueCount();
     if (valueCount != outerCount) {
       error(name, vector, String.format(
           "Outer value count = %d, but inner value count = %d",
@@ -318,7 +319,7 @@ public class BatchValidator {
   }
 
   private void validateVarCharVector(String name, VarCharVector vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
 
     // Disabled because a large number of operators
     // set up offset vectors wrongly.
@@ -326,15 +327,15 @@ public class BatchValidator {
       return;
     }
 
-    final int dataLength = vector.getBuffer().writerIndex();
+    int dataLength = vector.getBuffer().writerIndex();
     validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, 
valueCount, dataLength);
   }
 
   private void validateRepeatedVector(String name, BaseRepeatedValueVector 
vector) {
-    final ValueVector dataVector = vector.getDataVector();
-    final int dataLength = dataVector.getAccessor().getValueCount();
-    final int valueCount = vector.getAccessor().getValueCount();
-    final int itemCount = validateOffsetVector(name + "-offsets", 
vector.getOffsetVector(),
+    ValueVector dataVector = vector.getDataVector();
+    int dataLength = dataVector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
+    int itemCount = validateOffsetVector(name + "-offsets", 
vector.getOffsetVector(),
         true, valueCount, dataLength);
 
     if (dataLength != itemCount) {
@@ -352,11 +353,11 @@ public class BatchValidator {
   }
 
   private void validateRepeatedBitVector(String name, RepeatedBitVector 
vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
-    final int maxBitCount = valueCount * 8;
-    final int elementCount = validateOffsetVector(name + "-offsets",
+    int valueCount = vector.getAccessor().getValueCount();
+    int maxBitCount = valueCount * 8;
+    int elementCount = validateOffsetVector(name + "-offsets",
         vector.getOffsetVector(), true, valueCount, maxBitCount);
-    final BitVector dataVector = vector.getDataVector();
+    BitVector dataVector = vector.getDataVector();
     if (dataVector.getAccessor().getValueCount() != elementCount) {
       error(name, vector, String.format(
           "Bit vector has %d values, but offset vector labels %d values",
@@ -372,10 +373,10 @@ public class BatchValidator {
   }
 
   private void validateBitVector(String name, BitVector vector) {
-    final BitVector.Accessor accessor = vector.getAccessor();
-    final int valueCount = accessor.getValueCount();
-    final int dataLength = vector.getBuffer().writerIndex();
-    final int expectedLength = BitVector.getSizeFromCount(valueCount);
+    BitVector.Accessor accessor = vector.getAccessor();
+    int valueCount = accessor.getValueCount();
+    int dataLength = vector.getBuffer().writerIndex();
+    int expectedLength = BitVector.getSizeFromCount(valueCount);
     if (dataLength != expectedLength) {
       error(name, vector, String.format(
           "Bit vector has %d values, buffer has length %d, expected %d",
@@ -385,8 +386,8 @@ public class BatchValidator {
 
   private int validateOffsetVector(String name, UInt4Vector offsetVector,
       boolean repeated, int valueCount, int maxOffset) {
-    final UInt4Vector.Accessor accessor = offsetVector.getAccessor();
-    final int offsetCount = accessor.getValueCount();
+    UInt4Vector.Accessor accessor = offsetVector.getAccessor();
+    int offsetCount = accessor.getValueCount();
     // TODO: Disabled because a large number of operators
     // set up offset vectors incorrectly.
 //    if (!repeated && offsetCount == 0) {
@@ -412,7 +413,7 @@ public class BatchValidator {
     }
 
     for (int i = 1; i < offsetCount; i++) {
-      final int offset = accessor.get(i);
+      int offset = accessor.get(i);
       if (offset < prevOffset) {
         error(name, offsetVector, String.format(
             "Offset vector [%d] contained %d, expected >= %d",
@@ -432,19 +433,19 @@ public class BatchValidator {
   }
 
   private void verifyIsSetVector(ValueVector parent, UInt1Vector bv) {
-    final String name = String.format("%s (%s)-bits",
+    String name = String.format("%s (%s)-bits",
         parent.getField().getName(),
         parent.getClass().getSimpleName());
-    final int rowCount = parent.getAccessor().getValueCount();
-    final int bitCount = bv.getAccessor().getValueCount();
+    int rowCount = parent.getAccessor().getValueCount();
+    int bitCount = bv.getAccessor().getValueCount();
     if (bitCount != rowCount) {
       error(name, bv, String.format(
           "Value count = %d, but bit count = %d",
           rowCount, bitCount));
     }
-    final UInt1Vector.Accessor ba = bv.getAccessor();
+    UInt1Vector.Accessor ba = bv.getAccessor();
     for (int i = 0; i < bitCount; i++) {
-      final int value = ba.get(i);
+      int value = ba.get(i);
       if (value != 0 && value != 1) {
         error(name, bv, String.format(
             "Bit vector[%d] = %d, expected 0 or 1",
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
index 5fffe40..8688add 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
@@ -47,7 +47,7 @@ public class RepeatedVectorState implements VectorState {
     // vector, and the scalar (value) portion of the array writer.
 
     arrayWriter = (AbstractArrayWriter) writer;
-    AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) 
writer.entry().events();
+    final AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) 
writer.entry().events();
     valuesState = SimpleVectorState.vectorState(writer.schema(), colWriter, 
vector.getDataVector());
 
     // Create the offsets state with the offset vector portion of the repeated
@@ -116,19 +116,19 @@ public class RepeatedVectorState implements VectorState {
    * after the values copied during roll-over.</li>
    * </ul>
    *
-   * @param cardinality the number of outer elements to create in the 
look-ahead
+   * @param newCardinality the number of outer elements to create in the 
look-ahead
    * vector
    */
 
   @Override
-  public void rollover(int cardinality) {
+  public void rollover(int newCardinality) {
 
     // Swap out the two vectors. The index presented to the caller
     // is that of the data vector: the next position in the data
     // vector to be set into the data vector writer index.
 
-    valuesState.rollover(childCardinality(cardinality));
-    offsetsState.rollover(cardinality);
+    valuesState.rollover(childCardinality(newCardinality));
+    offsetsState.rollover(newCardinality);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
index faf2031..60fb7f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
@@ -123,7 +123,7 @@ public abstract class SingleVectorState implements 
VectorState {
 
       // Cap the allocated size to the maximum.
 
-      final int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) 
cardinality * schema.expectedWidth());
+      int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) 
cardinality * schema.expectedWidth());
       ((VariableWidthVector) vector).allocateNew(size, cardinality);
       return vector.getAllocatedSize();
     }
@@ -199,9 +199,9 @@ public abstract class SingleVectorState implements 
VectorState {
       // for the current row. We must subtract that offset from each copied
       // value to adjust the offset for the destination.
 
-      final UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) 
backupVector).getAccessor();
-      final UInt4Vector.Mutator destMutator = ((UInt4Vector) 
mainVector).getMutator();
-      final int offset = childWriter.rowStartIndex();
+      UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) 
backupVector).getAccessor();
+      UInt4Vector.Mutator destMutator = ((UInt4Vector) 
mainVector).getMutator();
+      int offset = childWriter.rowStartIndex();
       int newIndex = 1;
       ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} 
to {} with offset {}",
           Math.max(0, sourceEndIndex - sourceStartIndex + 1),
@@ -261,13 +261,13 @@ public abstract class SingleVectorState implements 
VectorState {
   @Override
   public void rollover(int cardinality) {
 
-    final int sourceStartIndex = writer.rowStartIndex();
+    int sourceStartIndex = writer.rowStartIndex();
 
     // Remember the last write index for the original vector.
     // This tells us the end of the set of values to move, while the
     // sourceStartIndex above tells us the start.
 
-    final int sourceEndIndex = writer.lastWriteIndex();
+    int sourceEndIndex = writer.lastWriteIndex();
 
     // Switch buffers between the backup vector and the writer's output
     // vector. Done this way because writers are bound to vectors and
@@ -306,7 +306,7 @@ public abstract class SingleVectorState implements 
VectorState {
    */
 
   protected static MajorType parseVectorType(ValueVector vector) {
-    final MajorType purportedType = vector.getField().getType();
+    MajorType purportedType = vector.getField().getType();
     if (purportedType.getMode() != DataMode.OPTIONAL) {
       return purportedType;
     }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index f803ef4..51e47ba 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -17,11 +17,22 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -30,20 +41,9 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-
-import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class MockRecordBatch implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
 
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
@@ -61,12 +61,12 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
   protected final OperatorContext oContext;
   protected final BufferAllocator allocator;
 
-  private MockRecordBatch(@NotNull final FragmentContext context,
-                          @Nullable final OperatorContext oContext,
-                          @NotNull final List<RowSet> testRowSets,
-                          @NotNull final List<IterOutcome> iterOutcomes,
-                          @NotNull final BatchSchema schema,
-                          final boolean dummy) {
+  private MockRecordBatch(@NotNull FragmentContext context,
+                          @Nullable OperatorContext oContext,
+                          @NotNull List<RowSet> testRowSets,
+                          @NotNull List<IterOutcome> iterOutcomes,
+                          @NotNull BatchSchema schema,
+                          boolean dummy) {
     Preconditions.checkNotNull(testRowSets);
     Preconditions.checkNotNull(iterOutcomes);
     Preconditions.checkNotNull(schema);
@@ -83,11 +83,11 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
   }
 
   @Deprecated
-  public MockRecordBatch(@Nullable final FragmentContext context,
-                         @Nullable final OperatorContext oContext,
-                         @NotNull final List<VectorContainer> testContainers,
-                         @NotNull final List<IterOutcome> iterOutcomes,
-                         final BatchSchema schema) {
+  public MockRecordBatch(@Nullable FragmentContext context,
+                         @Nullable OperatorContext oContext,
+                         @NotNull List<VectorContainer> testContainers,
+                         @NotNull List<IterOutcome> iterOutcomes,
+                         BatchSchema schema) {
     this(context,
          oContext,
          testContainers.stream().
@@ -99,12 +99,12 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
   }
 
   @Deprecated
-  public MockRecordBatch(@Nullable final FragmentContext context,
-                         @Nullable final OperatorContext oContext,
-                         @NotNull final List<VectorContainer> testContainers,
-                         @NotNull final List<IterOutcome> iterOutcomes,
-                         @NotNull final List<SelectionVector2> 
selectionVector2s,
-                         final BatchSchema schema) {
+  public MockRecordBatch(@Nullable FragmentContext context,
+                         @Nullable OperatorContext oContext,
+                         @NotNull List<VectorContainer> testContainers,
+                         @NotNull List<IterOutcome> iterOutcomes,
+                         @NotNull List<SelectionVector2> selectionVector2s,
+                         BatchSchema schema) {
     this(context,
       oContext,
       new Supplier<List<RowSet>>() {
@@ -200,11 +200,11 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
     IterOutcome currentOutcome;
 
     if (currentContainerIndex < rowSets.size()) {
-      final RowSet rowSet = rowSets.get(currentContainerIndex);
-      final VectorContainer input = rowSet.container();
+      RowSet rowSet = rowSets.get(currentContainerIndex);
+      VectorContainer input = rowSet.container();
       // We need to do this since the downstream operator expects vector 
reference to be same
       // after first next call in cases when schema is not changed
-      final BatchSchema inputSchema = input.getSchema();
+      BatchSchema inputSchema = input.getSchema();
       if (!container.getSchema().isEquivalent(inputSchema)) {
         container.clear();
         container = new VectorContainer(allocator, inputSchema);
@@ -217,7 +217,7 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
           if ( input.hasRecordCount() ) { // in case special test of 
uninitialized input container
             container.setRecordCount(input.getRecordCount());
           }
-          final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) 
rowSet).getSv2();
+          SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
 
           if (sv2 != null) {
             // Operators assume that new values for an Sv2 are transferred in.
@@ -307,23 +307,23 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
     public Builder() {
     }
 
-    private Builder sendData(final RowSet rowSet, final IterOutcome outcome) {
+    private Builder sendData(RowSet rowSet, IterOutcome outcome) {
       Preconditions.checkState(batchSchema == null);
       rowSets.add(rowSet);
       iterOutcomes.add(outcome);
       return this;
     }
 
-    public Builder sendData(final RowSet rowSet) {
-      final IterOutcome outcome = rowSets.isEmpty()? 
IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
+    public Builder sendData(RowSet rowSet) {
+      IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: 
IterOutcome.OK;
       return sendData(rowSet, outcome);
     }
 
-    public Builder sendDataWithNewSchema(final RowSet rowSet) {
+    public Builder sendDataWithNewSchema(RowSet rowSet) {
       return sendData(rowSet, IterOutcome.OK_NEW_SCHEMA);
     }
 
-    public Builder sendDataAndEmit(final RowSet rowSet) {
+    public Builder sendDataAndEmit(RowSet rowSet) {
       return sendData(rowSet, IterOutcome.EMIT);
     }
 
@@ -335,18 +335,18 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
       return this;
     }
 
-    public Builder setSchema(final BatchSchema batchSchema) {
+    public Builder setSchema(BatchSchema batchSchema) {
       Preconditions.checkState(!rowSets.isEmpty());
       this.batchSchema = Preconditions.checkNotNull(batchSchema);
       return this;
     }
 
-    public Builder withOperatorContext(final OperatorContext oContext) {
+    public Builder withOperatorContext(OperatorContext oContext) {
       this.oContext = Preconditions.checkNotNull(oContext);
       return this;
     }
 
-    public MockRecordBatch build(final FragmentContext context) {
+    public MockRecordBatch build(FragmentContext context) {
       BatchSchema tempSchema = batchSchema;
 
       if (tempSchema == null && !rowSets.isEmpty()) {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
index a691b2f..7dae42c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
@@ -28,10 +28,14 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import 
org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
@@ -40,8 +44,6 @@ import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -72,42 +74,51 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
     rsLoader.startBatch();
     byte[] value = new byte[512];
     Arrays.fill(value, (byte) 'X');
-    int count = 0;
-    while (! rootWriter.isFull()) {
-      rootWriter.start();
-      rootWriter.scalar(0).setBytes(value, value.length);
-      rootWriter.save();
-      count++;
-    }
 
     // Number of rows should be driven by vector size.
     // Our row count should include the overflow row
 
     int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
-    assertEquals(expectedCount + 1, count);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        rootWriter.scalar(0).setBytes(value, value.length);
+        rootWriter.save();
+        count++;
+      }
 
-    // Loader's row count should include only "visible" rows
+      assertEquals(expectedCount + 1, count);
 
-    assertEquals(expectedCount, rootWriter.rowCount());
+      // Loader's row count should include only "visible" rows
 
-    // Total count should include invisible and look-ahead rows.
+      assertEquals(expectedCount, rootWriter.rowCount());
 
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      // Total count should include invisible and look-ahead rows.
 
-    // Result should exclude the overflow row
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    result.clear();
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
 
     // Next batch should start with the overflow row
 
-    rsLoader.startBatch();
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    result.clear();
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -135,41 +146,52 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
     rsLoader.startBatch();
     byte[] value = new byte[512];
     Arrays.fill(value, (byte) 'X');
-    int count = 0;
-    while (! rootWriter.isFull()) {
-      rootWriter.start();
-      rootWriter.scalar(0).setBytes(value, value.length);
-      rootWriter.save();
-      count++;
-    }
 
     // Our row count should include the overflow row
 
     int expectedCount = 8 * 1024 * 1024 / value.length;
-    assertEquals(expectedCount + 1, count);
 
-    // Loader's row count should include only "visible" rows
+    // First batch, with overflow
 
-    assertEquals(expectedCount, rootWriter.rowCount());
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        rootWriter.scalar(0).setBytes(value, value.length);
+        rootWriter.save();
+        count++;
+      }
+      assertEquals(expectedCount + 1, count);
 
-    // Total count should include invisible and look-ahead rows.
+      // Loader's row count should include only "visible" rows
 
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      assertEquals(expectedCount, rootWriter.rowCount());
 
-    // Result should exclude the overflow row
+      // Total count should include invisible and look-ahead rows.
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    result.clear();
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
 
     // Next batch should start with the overflow row
 
-    rsLoader.startBatch();
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    result.clear();
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -207,7 +229,9 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
 
     // Harvest the full batch
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
+    VectorContainer container = rsLoader.harvest();
+    BatchValidator.validate(container);
+    RowSet result = fixture.wrap(container);
     result.clear();
 
     // Close without harvesting the overflow batch.
@@ -279,73 +303,79 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
     byte[] value = new byte[473];
     Arrays.fill(value, (byte) 'X');
     String strValue = new String(value, Charsets.UTF_8);
-    int count = 0;
-    int rowSize = 0;
-    int totalSize = 0;
     int valuesPerArray = 13;
-    while (rootWriter.start()) {
-      totalSize += rowSize;
-      rowSize = 0;
-      ScalarWriter array = rootWriter.array(0).scalar();
-      for (int i = 0; i < valuesPerArray; i++) {
-        String cellValue = strValue + (count + 1) + "." + i;
-        array.setString(cellValue);
-        rowSize += cellValue.length();
+    int count = 0;
+
+    {
+      int rowSize = 0;
+      int totalSize = 0;
+      while (rootWriter.start()) {
+        totalSize += rowSize;
+        rowSize = 0;
+        ScalarWriter array = rootWriter.array(0).scalar();
+        for (int i = 0; i < valuesPerArray; i++) {
+          String cellValue = strValue + (count + 1) + "." + i;
+          array.setString(cellValue);
+          rowSize += cellValue.length();
+        }
+        rootWriter.save();
+        count++;
       }
-      rootWriter.save();
-      count++;
-    }
 
-    // Row count should include the overflow row.
+      // Row count should include the overflow row.
 
-    int expectedCount = count - 1;
+      int expectedCount = count - 1;
 
-    // Size without overflow row should fit in the vector, size
-    // with overflow should not.
+      // Size without overflow row should fit in the vector, size
+      // with overflow should not.
 
-    assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
-    assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
+      assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
+      assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
 
-    // Result should exclude the overflow row. Last row
-    // should hold the last full array.
+      // Result should exclude the overflow row. Last row
+      // should hold the last full array.
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    RowSetReader reader = result.reader();
-    reader.setPosition(expectedCount - 1);
-    ArrayReader arrayReader = reader.array(0);
-    ScalarReader strReader = arrayReader.scalar();
-    assertEquals(valuesPerArray, arrayReader.size());
-    for (int i = 0; i < valuesPerArray; i++) {
-      assertTrue(arrayReader.next());
-      String cellValue = strValue + (count - 1) + "." + i;
-      assertEquals(cellValue, strReader.getString());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      RowSetReader reader = result.reader();
+      reader.setPosition(expectedCount - 1);
+      ArrayReader arrayReader = reader.array(0);
+      ScalarReader strReader = arrayReader.scalar();
+      assertEquals(valuesPerArray, arrayReader.size());
+      for (int i = 0; i < valuesPerArray; i++) {
+        assertTrue(arrayReader.next());
+        String cellValue = strValue + (count - 1) + "." + i;
+        assertEquals(cellValue, strReader.getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     // Next batch should start with the overflow row.
     // The only row in this next batch should be the whole
     // array being written at the time of overflow.
 
-    rsLoader.startBatch();
-//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) 
rootWriter.array(0).scalar()).vector(), 0, 5);
-//    ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter());
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) 
rootWriter.array(0).scalar()).vector(), 0, 5);
-    assertEquals(1, result.rowCount());
-    reader = result.reader();
-    reader.next();
-    arrayReader = reader.array(0);
-    strReader = arrayReader.scalar();
-    assertEquals(valuesPerArray, arrayReader.size());
-    for (int i = 0; i < valuesPerArray; i++) {
-      assertTrue(arrayReader.next());
-      String cellValue = strValue + (count) + "." + i;
-      assertEquals(cellValue, strReader.getString());
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(count, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      RowSetReader reader = result.reader();
+      reader.next();
+      ArrayReader arrayReader = reader.array(0);
+      ScalarReader strReader = arrayReader.scalar();
+      assertEquals(valuesPerArray, arrayReader.size());
+      for (int i = 0; i < valuesPerArray; i++) {
+        assertTrue(arrayReader.next());
+        String cellValue = strValue + count + "." + i;
+        assertEquals(cellValue, strReader.getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
@@ -429,10 +459,11 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
 
     // Verify
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(count - 1, result.rowCount());
-
     {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(count - 1, result.rowCount());
       RowSetReader reader = result.reader();
       ArrayReader aArray = reader.array("a");
       ScalarReader aReader = aArray.scalar();
@@ -502,10 +533,11 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
       count++;
     }
 
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(6, result.rowCount());
-
     {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(6, result.rowCount());
       RowSetReader reader = result.reader();
       ArrayReader aArray = reader.array("a");
       ScalarReader aReader = aArray.scalar();
@@ -550,8 +582,8 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
         }
         j++;
       }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
@@ -624,7 +656,9 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
       rowId++;
     }
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
+    VectorContainer container = rsLoader.harvest();
+    BatchValidator.validate(container);
+    RowSet result = fixture.wrap(container);
     assertEquals(rowId - 1, result.rowCount());
     RowSetReader reader = result.reader();
     ArrayReader cArray = reader.array("c");
@@ -677,30 +711,38 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
 
     // Result should exclude the overflow row
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(count - 1, result.rowCount());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(count - 1, result.rowCount());
 
-    RowSetReader reader = result.reader();
-    while (reader.next()) {
-      assertEquals(reader.offset(), reader.scalar(0).getInt());
-      assertTrue(reader.scalar(1).isNull());
-      assertArrayEquals(value, reader.scalar(2).getBytes());
-      assertTrue(reader.scalar(3).isNull());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(reader.offset(), reader.scalar(0).getInt());
+        assertTrue(reader.scalar(1).isNull());
+        assertArrayEquals(value, reader.scalar(2).getBytes());
+        assertTrue(reader.scalar(3).isNull());
+      }
+      result.clear();
     }
-    result.clear();
 
     // Next batch should start with the overflow row
 
     rsLoader.startBatch();
-    result = fixture.wrap(rsLoader.harvest());
-    reader = result.reader();
-    assertEquals(1, result.rowCount());
-    assertTrue(reader.next());
-    assertEquals(count - 1, reader.scalar(0).getInt());
-    assertTrue(reader.scalar(1).isNull());
-    assertArrayEquals(value, reader.scalar(2).getBytes());
-    assertTrue(reader.scalar(3).isNull());
-    result.clear();
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      RowSetReader reader = result.reader();
+      assertEquals(1, result.rowCount());
+      assertTrue(reader.next());
+      assertEquals(count - 1, reader.scalar(0).getInt());
+      assertTrue(reader.scalar(1).isNull());
+      assertArrayEquals(value, reader.scalar(2).getBytes());
+      assertTrue(reader.scalar(3).isNull());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -721,6 +763,11 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
     byte[] head = "abc".getBytes();
     byte[] tail = new byte[523];
     Arrays.fill(tail, (byte) 'X');
+
+    String expected = new String(head, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+
     int count = 0;
     ScalarWriter colWriter = rootWriter.scalar(0);
     while (! rootWriter.isFull()) {
@@ -749,32 +796,37 @@ public class TestResultSetLoaderOverflow extends 
SubOperatorTest {
 
     // Result should exclude the overflow row
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
 
-    // Verify that the values were, in fact, appended.
+      // Verify that the values were, in fact, appended.
 
-    String expected = new String(head, Charsets.UTF_8);
-    expected += new String(tail, Charsets.UTF_8);
-    expected += new String(tail, Charsets.UTF_8);
-    RowSetReader reader = result.reader();
-    while (reader.next()) {
-      assertEquals(expected, reader.scalar(0).getString());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(expected, reader.scalar(0).getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     // Next batch should start with the overflow row
 
     rsLoader.startBatch();
     assertEquals(1, rootWriter.rowCount());
     assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    reader = result.reader();
-    while (reader.next()) {
-      assertEquals(expected, reader.scalar(0).getString());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(expected, reader.scalar(0).getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 1da362a..9f0b1d5 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -288,9 +288,12 @@ public class OffsetVectorWriterImpl extends 
AbstractFixedWidthWriter implements
     // Rollover is occurring. This means the current row is not complete.
     // We want to keep 0..(row index - 1) which gives us (row index)
     // rows. But, this being an offset vector, we add one to account
-    // for the extra 0 value at the start.
+    // for the extra 0 value at the start. That is, we want to set
+    // the value count to the current row start index, which already
+    // is set to one past the index of the last zero-based index.
+    // (Offset vector indexes are confusing.)
 
-    setValueCount(vectorIndex.rowStartIndex() + 1);
+    setValueCount(vectorIndex.rowStartIndex());
   }
 
   @Override

Reply via email to