This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ed3ad76cb7b4165911acdf5866b0b4e57cd2d680
Author: Paul Rogers <par0...@yahoo.com>
AuthorDate: Sun Oct 13 12:41:03 2019 -0700

    DRILL-7403: Validate batch checks, vector integretity in unit tests
    
    Enhances the existing record batch checks to check all the various
    batch record counts, and to more fully validate all vector types.
    
    This code revealed that virtually all record batches have
    problems: they omit setting some record count or other, they
    introduce some form of vector corruption.
    
    Since we want things to work as we make fixes, this change enables
    the checks for only one record batch: the "new" scan. Others are
    to come as they are fixed.
    
    closes #1871
---
 .../physical/impl/validate/BatchValidator.java     | 433 ++++++++++++++++-----
 .../validate/IteratorValidatorBatchIterator.java   |  38 +-
 .../apache/drill/exec/vector/VectorValidator.java  |   6 +-
 .../physical/impl/validate/TestBatchValidator.java | 118 +++---
 4 files changed, 416 insertions(+), 179 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 83923c9..dde6583 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -17,23 +17,26 @@
  */
 package org.apache.drill.exec.physical.impl.validate;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.IdentityHashMap;
+import java.util.Map;
 
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.FixedWidthVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.NullableVector;
-import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.RepeatedBitVector;
+import org.apache.drill.exec.vector.UInt1Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
-import org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -45,103 +48,361 @@ import 
org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike;
  */
 
 public class BatchValidator {
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(BatchValidator.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(BatchValidator.class);
 
+  public static final boolean LOG_TO_STDOUT = true;
   public static final int MAX_ERRORS = 100;
 
-  private final int rowCount;
-  private final VectorAccessible batch;
-  private final List<String> errorList;
-  private int errorCount;
+  public interface ErrorReporter {
+    void error(String name, ValueVector vector, String msg);
+    void warn(String name, ValueVector vector, String msg);
+    void error(String msg);
+    int errorCount();
+  }
+
+  public abstract static class BaseErrorReporter implements ErrorReporter {
+
+    private final String opName;
+    private int errorCount;
+
+    public BaseErrorReporter(String opName) {
+      this.opName = opName;
+    }
+
+    protected boolean startError() {
+      if (errorCount == 0) {
+        warn("Found one or more vector errors from " + opName);
+      }
+      errorCount++;
+      if (errorCount >= MAX_ERRORS) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void error(String name, ValueVector vector, String msg) {
+      error(String.format("%s - %s: %s",
+            name, vector.getClass().getSimpleName(), msg));
+    }
+
+    @Override
+    public void warn(String name, ValueVector vector, String msg) {
+      warn(String.format("%s - %s: %s",
+            name, vector.getClass().getSimpleName(), msg));
+    }
 
-  public BatchValidator(VectorAccessible batch) {
-    rowCount = batch.getRecordCount();
-    this.batch = batch;
-    errorList = null;
+    public abstract void warn(String msg);
+
+    @Override
+    public int errorCount() { return errorCount; }
+  }
+
+  private static class StdOutReporter extends BaseErrorReporter {
+
+    public StdOutReporter(String opName) {
+      super(opName);
+    }
+
+    @Override
+    public void error(String msg) {
+      if (startError()) {
+        System.out.println(msg);
+      }
+    }
+
+    @Override
+    public void warn(String msg) {
+      System.out.println(msg);
+    }
   }
 
-  public BatchValidator(VectorAccessible batch, boolean captureErrors) {
-    rowCount = batch.getRecordCount();
-    this.batch = batch;
-    if (captureErrors) {
-      errorList = new ArrayList<>();
+  private static class LogReporter extends BaseErrorReporter {
+
+    public LogReporter(String opName) {
+      super(opName);
+    }
+
+    @Override
+    public void error(String msg) {
+      if (startError()) {
+        logger.error(msg);
+      }
+    }
+
+    @Override
+    public void warn(String msg) {
+      logger.error(msg);
+    }
+  }
+
+  private enum CheckMode { COUNTS, ALL };
+
+  private static final Map<Class<? extends RecordBatch>, CheckMode> checkRules 
= buildRules();
+
+  private final ErrorReporter errorReporter;
+
+  public BatchValidator(ErrorReporter errorReporter) {
+    this.errorReporter = errorReporter;
+  }
+
+  /**
+   * At present, most operators will not pass the checks here. The following
+   * table identifies those that should be checked, and the degree of check.
+   * Over time, this table should include all operators, and thus become
+   * unnecessary.
+   */
+  private static Map<Class<? extends RecordBatch>, CheckMode> buildRules() {
+    final Map<Class<? extends RecordBatch>, CheckMode> rules = new 
IdentityHashMap<>();
+    //rules.put(OperatorRecordBatch.class, CheckMode.ALL);
+    return rules;
+  }
+
+  public static boolean validate(RecordBatch batch) {
+    final CheckMode checkMode = checkRules.get(batch.getClass());
+
+    // If no rule, don't check this batch.
+
+    if (checkMode == null) {
+
+      // As work proceeds, might want to log those batches not checked.
+      // For now, there are too many.
+
+      return true;
+    }
+
+    // All batches that do any checks will at least check counts.
+
+    final ErrorReporter reporter = errorReporter(batch);
+    final int rowCount = batch.getRecordCount();
+    int valueCount = rowCount;
+    final VectorContainer container = batch.getContainer();
+    if (!container.hasRecordCount()) {
+      reporter.error(String.format(
+          "%s: Container record count not set",
+          batch.getClass().getSimpleName()));
     } else {
-      errorList = null;
+      // Row count will = container count for most operators.
+      // Row count <= container count for the filter operator.
+
+      final int containerRowCount = container.getRecordCount();
+      valueCount = containerRowCount;
+      switch (batch.getSchema().getSelectionVectorMode()) {
+      case FOUR_BYTE:
+        final int sv4Count = batch.getSelectionVector4().getCount();
+        if (sv4Count != rowCount) {
+          reporter.error(String.format(
+              "Mismatch between %s record count = %d, SV4 record count = %d",
+              batch.getClass().getSimpleName(),
+              rowCount, sv4Count));
+        }
+        // TODO: Don't know how to check SV4 batches
+        return true;
+      case TWO_BYTE:
+        final int sv2Count = batch.getSelectionVector2().getCount();
+        if (sv2Count != rowCount) {
+          reporter.error(String.format(
+              "Mismatch between %s record count = %d, SV2 record count = %d",
+              batch.getClass().getSimpleName(),
+              rowCount, sv2Count));
+        }
+        if (sv2Count > containerRowCount) {
+          reporter.error(String.format(
+              "Mismatch between %s container count = %d, SV2 record count = 
%d",
+              batch.getClass().getSimpleName(),
+              containerRowCount, sv2Count));
+        }
+        final int svTotalCount = 
batch.getSelectionVector2().getBatchActualRecordCount();
+        if (svTotalCount != containerRowCount) {
+          reporter.error(String.format(
+              "Mismatch between %s container count = %d, SV2 total count = %d",
+              batch.getClass().getSimpleName(),
+              containerRowCount, svTotalCount));
+        }
+        break;
+      default:
+        if (rowCount != containerRowCount) {
+          reporter.error(String.format(
+              "Mismatch between %s record count = %d, container record count = 
%d",
+              batch.getClass().getSimpleName(),
+              rowCount, containerRowCount));
+        }
+        break;
+      }
     }
+    if (checkMode == CheckMode.ALL) {
+      new BatchValidator(reporter).validateBatch(batch, valueCount);
+    }
+    return reporter.errorCount() == 0;
   }
 
-  public void validate() {
-    if (batch.getRecordCount() == 0) {
-      return;
+  public static boolean validate(VectorAccessible batch) {
+    final ErrorReporter reporter = errorReporter(batch);
+    new BatchValidator(reporter).validateBatch(batch, batch.getRecordCount());
+    return reporter.errorCount() == 0;
+  }
+
+  private static ErrorReporter errorReporter(VectorAccessible batch) {
+    final String opName = batch.getClass().getSimpleName();
+    if (LOG_TO_STDOUT) {
+      return new StdOutReporter(opName);
+    } else {
+      return new LogReporter(opName);
     }
-    for (VectorWrapper<? extends ValueVector> w : batch) {
-      validateWrapper(w);
+  }
+
+  public void validateBatch(VectorAccessible batch, int rowCount) {
+    for (final VectorWrapper<? extends ValueVector> w : batch) {
+      validateWrapper(rowCount, w);
     }
   }
 
-  private void validateWrapper(VectorWrapper<? extends ValueVector> w) {
+  private void validateWrapper(int rowCount, VectorWrapper<? extends 
ValueVector> w) {
     if (w instanceof SimpleVectorWrapper) {
-      validateVector(w.getValueVector());
+      validateVector(rowCount, w.getValueVector());
+    }
+  }
+
+  private void validateVector(int expectedCount, ValueVector vector) {
+    final int valueCount = vector.getAccessor().getValueCount();
+    if (valueCount != expectedCount) {
+      error(vector.getField().getName(), vector,
+          String.format("Row count = %d, but value count = %d",
+              expectedCount, valueCount));
     }
+    validateVector(vector.getField().getName(), vector);
   }
 
-  private void validateVector(ValueVector vector) {
-    String name = vector.getField().getName();
-    if (vector instanceof NullableVector) {
+  private void validateVector(String name, ValueVector vector) {
+    if (vector instanceof BitVector) {
+      validateBitVector(name, (BitVector) vector);
+    } else if (vector instanceof RepeatedBitVector) {
+      validateRepeatedBitVector(name, (RepeatedBitVector) vector);
+    } else if (vector instanceof NullableVector) {
       validateNullableVector(name, (NullableVector) vector);
     } else if (vector instanceof VariableWidthVector) {
-      validateVariableWidthVector(name, (VariableWidthVector) vector, 
rowCount);
+      validateVariableWidthVector(name, (VariableWidthVector) vector);
     } else if (vector instanceof FixedWidthVector) {
       validateFixedWidthVector(name, (FixedWidthVector) vector);
     } else if (vector instanceof BaseRepeatedValueVector) {
       validateRepeatedVector(name, (BaseRepeatedValueVector) vector);
     } else {
-      logger.debug("Don't know how to validate vector: " + name + " of class " 
+ vector.getClass().getSimpleName());
+      logger.debug("Don't know how to validate vector: {}  of class {}",
+          name, vector.getClass().getSimpleName());
+    }
+  }
+
+  private void validateNullableVector(String name, NullableVector vector) {
+    final int outerCount = vector.getAccessor().getValueCount();
+    final ValueVector valuesVector = vector.getValuesVector();
+    final int valueCount = valuesVector.getAccessor().getValueCount();
+    if (valueCount != outerCount) {
+      error(name, vector, String.format(
+          "Outer value count = %d, but inner value count = %d",
+          outerCount, valueCount));
     }
+    verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector());
+    validateVector(name + "-values", valuesVector);
   }
 
-  private void validateVariableWidthVector(String name, VariableWidthVector 
vector, int entryCount) {
+  private void validateVariableWidthVector(String name, VariableWidthVector 
vector) {
 
     // Offsets are in the derived classes. Handle only VarChar for now.
 
     if (vector instanceof VarCharVector) {
-      validateVarCharVector(name, (VarCharVector) vector, entryCount);
+      validateVarCharVector(name, (VarCharVector) vector);
     } else {
-      logger.debug("Don't know how to validate vector: " + name + " of class " 
+ vector.getClass().getSimpleName());
+      logger.debug("Don't know how to validate vector: {}  of class {}",
+          name, vector.getClass().getSimpleName());
     }
   }
 
-  private void validateVarCharVector(String name, VarCharVector vector, int 
entryCount) {
-//    int dataLength = vector.getAllocatedByteCount(); // Includes offsets and 
data.
-    int dataLength = vector.getBuffer().capacity();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), 
entryCount, dataLength);
+  private void validateVarCharVector(String name, VarCharVector vector) {
+    final int valueCount = vector.getAccessor().getValueCount();
+
+    // Disabled because a large number of operators
+    // set up offset vectors wrongly.
+    if (valueCount == 0) {
+      return;
+    }
+
+    final int dataLength = vector.getBuffer().writerIndex();
+    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, 
valueCount, dataLength);
   }
 
   private void validateRepeatedVector(String name, BaseRepeatedValueVector 
vector) {
-
-    int dataLength = Integer.MAX_VALUE;
-    if (vector instanceof RepeatedVarCharVector) {
-      dataLength = ((RepeatedVarCharVector) 
vector).getOffsetVector().getValueCapacity();
-    } else if (vector instanceof RepeatedFixedWidthVectorLike) {
-      dataLength = ((BaseDataValueVector) 
vector.getDataVector()).getBuffer().capacity();
+    final ValueVector dataVector = vector.getDataVector();
+    final int dataLength = dataVector.getAccessor().getValueCount();
+    final int valueCount = vector.getAccessor().getValueCount();
+    final int itemCount = validateOffsetVector(name + "-offsets", 
vector.getOffsetVector(),
+        true, valueCount, dataLength);
+
+    if (dataLength != itemCount) {
+      error(name, vector, String.format(
+          "Data vector has %d values, but offset vector has %d values",
+          dataLength, itemCount));
     }
-    int itemCount = validateOffsetVector(name + "-offsets", 
vector.getOffsetVector(), rowCount, dataLength);
 
     // Special handling of repeated VarChar vectors
     // The nested data vectors are not quite exactly like top-level vectors.
 
-    ValueVector dataVector = vector.getDataVector();
     if (dataVector instanceof VariableWidthVector) {
-      validateVariableWidthVector(name + "-data", (VariableWidthVector) 
dataVector, itemCount);
+      validateVariableWidthVector(name + "-data", (VariableWidthVector) 
dataVector);
+    }
+  }
+
+  private void validateRepeatedBitVector(String name, RepeatedBitVector 
vector) {
+    final int valueCount = vector.getAccessor().getValueCount();
+    final int maxBitCount = valueCount * 8;
+    final int elementCount = validateOffsetVector(name + "-offsets",
+        vector.getOffsetVector(), true, valueCount, maxBitCount);
+    final BitVector dataVector = vector.getDataVector();
+    if (dataVector.getAccessor().getValueCount() != elementCount) {
+      error(name, vector, String.format(
+          "Bit vector has %d values, but offset vector labels %d values",
+          valueCount, elementCount));
+    }
+    validateBitVector(name + "-data", dataVector);
+  }
+
+  private void validateFixedWidthVector(String name, FixedWidthVector vector) {
+    // Not much to do. The only item to check is the vector
+    // count itself, which was already done. There is no inner
+    // structure to check.
+  }
+
+  private void validateBitVector(String name, BitVector vector) {
+    final BitVector.Accessor accessor = vector.getAccessor();
+    final int valueCount = accessor.getValueCount();
+    final int dataLength = vector.getBuffer().writerIndex();
+    final int expectedLength = BitVector.getSizeFromCount(valueCount);
+    if (dataLength != expectedLength) {
+      error(name, vector, String.format(
+          "Bit vector has %d values, buffer has length %d, expected %d",
+          valueCount, dataLength, expectedLength));
     }
   }
 
-  private int validateOffsetVector(String name, UInt4Vector offsetVector, int 
valueCount, int maxOffset) {
+  private int validateOffsetVector(String name, UInt4Vector offsetVector,
+      boolean repeated, int valueCount, int maxOffset) {
+    final UInt4Vector.Accessor accessor = offsetVector.getAccessor();
+    final int offsetCount = accessor.getValueCount();
+    // TODO: Disabled because a large number of operators
+    // set up offset vectors incorrectly.
+//    if (!repeated && offsetCount == 0) {
+//      System.out.println(String.format(
+//          "Offset vector for %s: [0] has length 0, expected 1+",
+//          name));
+//      return false;
+//    }
+    if (valueCount == 0 && offsetCount > 1 || valueCount > 0 && offsetCount != 
valueCount + 1) {
+      error(name, offsetVector, String.format(
+          "Outer vector has %d values, but offset vector has %d, expected %d",
+          valueCount, offsetCount, valueCount + 1));
+    }
     if (valueCount == 0) {
       return 0;
     }
-    UInt4Vector.Accessor accessor = offsetVector.getAccessor();
 
     // First value must be zero in current version.
 
@@ -150,14 +411,16 @@ public class BatchValidator {
       error(name, offsetVector, "Offset (0) must be 0 but was " + prevOffset);
     }
 
-    // Note <= comparison: offset vectors have (n+1) entries.
-
-    for (int i = 1; i <= valueCount; i++) {
-      int offset = accessor.get(i);
+    for (int i = 1; i < offsetCount; i++) {
+      final int offset = accessor.get(i);
       if (offset < prevOffset) {
-        error(name, offsetVector, "Decreasing offsets at (" + (i-1) + ", " + i 
+ ") = (" + prevOffset + ", " + offset + ")");
+        error(name, offsetVector, String.format(
+            "Offset vector [%d] contained %d, expected >= %d",
+            i, offset, prevOffset));
       } else if (offset > maxOffset) {
-        error(name, offsetVector, "Invalid offset at index " + i + " = " + 
offset + " exceeds maximum of " + maxOffset);
+        error(name, offsetVector, String.format(
+            "Invalid offset at index %d: %d exceeds maximum of %d",
+            i, offset, maxOffset));
       }
       prevOffset = offset;
     }
@@ -165,42 +428,28 @@ public class BatchValidator {
   }
 
   private void error(String name, ValueVector vector, String msg) {
-    if (errorCount == 0) {
-      logger.error("Found one or more vector errors from " + 
batch.getClass().getSimpleName());
-    }
-    errorCount++;
-    if (errorCount >= MAX_ERRORS) {
-      return;
-    }
-    String fullMsg = "Column " + name + " of type " + 
vector.getClass().getSimpleName( ) + ": " + msg;
-    logger.error(fullMsg);
-    if (errorList != null) {
-      errorList.add(fullMsg);
-    }
+    errorReporter.error(name, vector, msg);
   }
 
-  private void validateNullableVector(String name, NullableVector vector) {
-    // Can't validate at this time because the bits vector is in each
-    // generated subtype.
-
-    // Validate a VarChar vector because it is common.
-
-    if (vector instanceof NullableVarCharVector) {
-      VarCharVector values = ((NullableVarCharVector) 
vector).getValuesVector();
-      validateVarCharVector(name + "-values", values, rowCount);
+  private void verifyIsSetVector(ValueVector parent, UInt1Vector bv) {
+    final String name = String.format("%s (%s)-bits",
+        parent.getField().getName(),
+        parent.getClass().getSimpleName());
+    final int rowCount = parent.getAccessor().getValueCount();
+    final int bitCount = bv.getAccessor().getValueCount();
+    if (bitCount != rowCount) {
+      error(name, bv, String.format(
+          "Value count = %d, but bit count = %d",
+          rowCount, bitCount));
+    }
+    final UInt1Vector.Accessor ba = bv.getAccessor();
+    for (int i = 0; i < bitCount; i++) {
+      final int value = ba.get(i);
+      if (value != 0 && value != 1) {
+        error(name, bv, String.format(
+            "Bit vector[%d] = %d, expected 0 or 1",
+            i, value));
+      }
     }
   }
-
-  private void validateFixedWidthVector(String name, FixedWidthVector vector) {
-    // TODO Auto-generated method stub
-
-  }
-
-  /**
-   * Obtain the list of errors. For use in unit-testing this class.
-   * @return the list of errors found, or null if error capture was
-   * not enabled
-   */
-
-  public List<String> errors() { return errorList; }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 5c70f5d..3e7ae23 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -36,21 +36,23 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.VectorValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
 
-  static final boolean VALIDATE_VECTORS = false;
+  static final boolean VALIDATE_VECTORS = true;
 
-  /** For logging/debuggability only. */
+  /** For logging/debugability only. */
   private static volatile int instanceCount;
 
   /** @see org.apache.drill.exec.physical.config.IteratorValidator */
   private final boolean isRepeatable;
 
-  /** For logging/debuggability only. */
+  /** For logging/debugability only. */
   private final int instNum;
   {
     instNum = ++instanceCount;
@@ -62,24 +64,24 @@ public class IteratorValidatorBatchIterator implements 
CloseableRecordBatch {
    */
   private final RecordBatch incoming;
 
-  /** Incoming batch's type (simple class name); for logging/debuggability
+  /** Incoming batch's type (simple class name); for logging/debugability
    *  only. */
   private final String batchTypeName;
 
   /** Exception state of incoming batch; last value thrown by its next()
    *  method. */
-  private Throwable exceptionState = null;
+  private Throwable exceptionState;
 
   /** Main state of incoming batch; last value returned by its next() method. 
*/
-  private IterOutcome batchState = null;
+  private IterOutcome batchState;
 
   /** Last schema retrieved after OK_NEW_SCHEMA or OK from next().  Null if 
none
-   *  yet. Currently for logging/debuggability only. */
-  private BatchSchema lastSchema = null;
+   *  yet. Currently for logging/debugability only. */
+  private BatchSchema lastSchema;
 
   /** Last schema retrieved after OK_NEW_SCHEMA from next().  Null if none yet.
-   *  Currently for logging/debuggability only. */
-  private BatchSchema lastNewSchema = null;
+   *  Currently for logging/debugability only. */
+  private BatchSchema lastNewSchema;
 
   /**
    * {@link IterOutcome} return value sequence validation state.
@@ -342,8 +344,16 @@ public class IteratorValidatorBatchIterator implements 
CloseableRecordBatch {
   }
 
   private void validateBatch() {
-    if (validateBatches) {
-      new BatchValidator(incoming).validate();
+    if (validateBatches || VALIDATE_VECTORS) {
+      if (! BatchValidator.validate(incoming)) {
+        throw new IllegalStateException(
+            "Batch validation failed. Source operator: " +
+            incoming.getClass().getSimpleName());
+      }
+      // The following validation currently calculates, and discards
+      // a hash code. Since it requires manual checking, it is
+      // disabled by default.
+      // VectorValidator.validate(incoming);
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorValidator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorValidator.java
index 1cabd80..17997fc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorValidator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorValidator.java
@@ -28,7 +28,7 @@ public class VectorValidator {
     SelectionVectorMode mode = batch.getSchema().getSelectionVectorMode();
     switch(mode) {
       case NONE: {
-        for (VectorWrapper w : batch) {
+        for (VectorWrapper<?> w : batch) {
           ValueVector v = w.getValueVector();
           for (int i = 0; i < count; i++) {
             Object obj = v.getAccessor().getObject(i);
@@ -40,7 +40,7 @@ public class VectorValidator {
         break;
       }
       case TWO_BYTE: {
-        for (VectorWrapper w : batch) {
+        for (VectorWrapper<?> w : batch) {
           ValueVector v = w.getValueVector();
           for (int i = 0; i < count; i++) {
             int index = batch.getSelectionVector2().getIndex(i);
@@ -53,7 +53,7 @@ public class VectorValidator {
         break;
       }
       case FOUR_BYTE: {
-        for (VectorWrapper w : batch) {
+        for (VectorWrapper<?> w : batch) {
           ValueVector[] vv = w.getValueVectors();
           for (int i = 0; i < count; i++) {
             int index = batch.getSelectionVector4().get(i);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index fd8c844..a30bda9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -19,13 +19,17 @@ package org.apache.drill.exec.physical.impl.validate;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -33,38 +37,37 @@ import org.apache.drill.exec.vector.RepeatedVarCharVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.drill.test.BaseDirTestWatcher;
-import org.apache.drill.test.LogFixture;
-import org.apache.drill.test.OperatorFixture;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
+import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import ch.qos.logback.classic.Level;
+@Category(RowSetTests.class)
+public class TestBatchValidator extends SubOperatorTest {
 
-public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
+  public static class CapturingReporter implements 
BatchValidator.ErrorReporter {
 
-  protected static OperatorFixture fixture;
-  protected static LogFixture logFixture;
+    public List<String> errors = new ArrayList<>();
 
-  @ClassRule
-  public static final BaseDirTestWatcher dirTestWatcher = new 
BaseDirTestWatcher();
+    @Override
+    public void error(String name, ValueVector vector, String msg) {
+      error(String.format("%s (%s): %s",
+          name, vector.getClass().getSimpleName(), msg));
+    }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    logFixture = LogFixture.builder()
-        .toConsole()
-        .logger(BatchValidator.class, Level.TRACE)
-        .build();
-    fixture = OperatorFixture.standardFixture(dirTestWatcher);
-  }
+    @Override
+    public void warn(String name, ValueVector vector, String msg) {
+      error(name, vector, msg);
+    }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    fixture.close();
-    logFixture.close();
+    @Override
+    public void error(String msg) {
+      errors.add(msg);
+    }
+
+    @Override
+    public int errorCount() {
+      return errors.size();
+    }
   }
 
   @Test
@@ -81,9 +84,6 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
         .addRow(40, 140)
         .build();
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    assertTrue(validator.errors().isEmpty());
     batch.clear();
   }
 
@@ -101,9 +101,6 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
         .addRow("col4.1", "col4.2")
         .build();
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    assertTrue(validator.errors().isEmpty());
     batch.clear();
   }
 
@@ -120,9 +117,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
         .addRow(intArray(4), strArray("dino"))
         .build();
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    assertTrue(validator.errors().isEmpty());
+    assertTrue(BatchValidator.validate(batch.vectorAccessible()));
     batch.clear();
   }
 
@@ -150,14 +145,19 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
 
     // Validator should catch the error.
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Decreasing offsets"));
+    checkForError(batch, BAD_OFFSETS);
     batch.clear();
   }
 
+  private static void checkForError(SingleRowSet batch, String expectedError) {
+    CapturingReporter cr = new CapturingReporter();
+    new BatchValidator(cr).validateBatch(batch.vectorAccessible(), 
batch.rowCount());
+    assertTrue(cr.errors.size() > 0);
+    Pattern p = Pattern.compile(expectedError);
+    Matcher m = p.matcher(cr.errors.get(0));
+    assertTrue(m.find());
+  }
+
   @Test
   public void testVariableCorruptFirst() {
     TupleMetadata schema = new SchemaBuilder()
@@ -174,11 +174,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
 
     // Validator should catch the error.
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Offset (0) must be 0"));
+    checkForError(batch, "Offset \\(0\\) must be 0");
     batch.clear();
   }
 
@@ -210,11 +206,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
 
     // Validator should catch the error.
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Decreasing offsets"));
+    checkForError(batch, BAD_OFFSETS);
     batch.clear();
   }
 
@@ -234,11 +226,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
 
     // Validator should catch the error.
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Decreasing offsets"));
+    checkForError(batch, "Invalid offset");
     batch.clear();
   }
 
@@ -258,14 +246,12 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
 
     // Validator should catch the error.
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Invalid offset"));
+    checkForError(batch, "Invalid offset");
     batch.clear();
   }
 
+  private static final String BAD_OFFSETS = "Offset vector .* contained \\d+, 
expected >= \\d+";
+
   @Test
   public void testRepeatedBadArrayOffset() {
     TupleMetadata schema = new SchemaBuilder()
@@ -284,11 +270,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
     UInt4Vector ov = vc.getOffsetVector();
     ov.getMutator().set(3, 1);
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Decreasing offsets"));
+    checkForError(batch, BAD_OFFSETS);
     batch.clear();
   }
 
@@ -311,11 +293,7 @@ public class TestBatchValidator /* TODO: extends 
SubOperatorTest */ {
     UInt4Vector ov = vc.getOffsetVector();
     ov.getMutator().set(4, 100_000);
 
-    BatchValidator validator = new BatchValidator(batch.vectorAccessible(), 
true);
-    validator.validate();
-    List<String> errors = validator.errors();
-    assertEquals(1, errors.size());
-    assertTrue(errors.get(0).contains("Invalid offset"));
+    checkForError(batch, "Invalid offset");
     batch.clear();
   }
 }

Reply via email to