Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/156#discussion_r39464031
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 ---
    @@ -176,81 +177,103 @@ protected void doOnMatch(RelOptRuleCall call, 
DrillFilterRel filterRel, DrillPro
         RexNode pruneCondition = c.getFinalCondition();
     
         if (pruneCondition == null) {
    +      logger.debug("No conditions were found eligible for partition 
pruning.");
           return;
         }
     
     
         // set up the partitions
    -    final GroupScan groupScan = scanRel.getGroupScan();
    -    List<PartitionLocation> partitions = descriptor.getPartitions();
    -
    -    if (partitions.size() > Character.MAX_VALUE) {
    -      return;
    -    }
    -
    -    final NullableBitVector output = new 
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), 
allocator);
    -    final VectorContainer container = new VectorContainer();
    -
    -    try {
    -      final ValueVector[] vectors = new 
ValueVector[descriptor.getMaxHierarchyLevel()];
    -      for (int partitionColumnIndex : 
BitSets.toIter(partitionColumnBitSet)) {
    -        SchemaPath column = 
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    -        MajorType type = descriptor.getVectorType(column, settings);
    -        MaterializedField field = MaterializedField.create(column, type);
    -        ValueVector v = TypeHelper.getNewVector(field, allocator);
    -        v.allocateNew();
    -        vectors[partitionColumnIndex] = v;
    -        container.add(v);
    -      }
    -
    -      // populate partition vectors.
    -      descriptor.populatePartitionVectors(vectors, partitions, 
partitionColumnBitSet, fieldNameMap);
    -
    -      // materialize the expression
    -      logger.debug("Attempting to prune {}", pruneCondition);
    -      final LogicalExpression expr = DrillOptiq.toDrill(new 
DrillParseContext(settings), scanRel, pruneCondition);
    -      final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    -
    -      LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, container, errors, 
optimizerContext.getFunctionRegistry());
    -      // Make sure pruneCondition's materialized expression is always of 
BitType, so that
    -      // it's same as the type of output vector.
    -      if (materializedExpr.getMajorType().getMode() == 
TypeProtos.DataMode.REQUIRED) {
    -        materializedExpr = 
ExpressionTreeMaterializer.convertToNullableType(
    -            materializedExpr,
    -            materializedExpr.getMajorType().getMinorType(),
    -            optimizerContext.getFunctionRegistry(),
    -            errors);
    +    List<String> newFiles = Lists.newArrayList();
    +    long numTotal = 0; // total number of partitions
    +    int batchIndex = 0;
    +    String firstLocation = null;
    +
    +    // Outer loop: iterate over a list of batches of PartitionLocations
    +    for (List<PartitionLocation> partitions : descriptor) {
    +      numTotal += partitions.size();
    +      logger.debug("Evaluating partition pruning for batch {}", 
batchIndex);
    +      if (batchIndex == 0) { // save the first location in case everything 
is pruned
    +        firstLocation = partitions.get(0).getEntirePartitionLocation();
           }
    +      final NullableBitVector output = new 
NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), 
allocator);
    +      final VectorContainer container = new VectorContainer();
    +
    +      try {
    +        final ValueVector[] vectors = new 
ValueVector[descriptor.getMaxHierarchyLevel()];
    +          for (int partitionColumnIndex : 
BitSets.toIter(partitionColumnBitSet)) {
    +          SchemaPath column = 
SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex));
    +          MajorType type = descriptor.getVectorType(column, settings);
    +          MaterializedField field = MaterializedField.create(column, type);
    +          ValueVector v = TypeHelper.getNewVector(field, allocator);
    +          v.allocateNew();
    +          vectors[partitionColumnIndex] = v;
    +          container.add(v);
    +        }
     
    -      if (errors.getErrorCount() != 0) {
    -        logger.warn("Failure while materializing expression [{}].  Errors: 
{}", expr, errors);
    -      }
    +        // populate partition vectors.
    +        descriptor.populatePartitionVectors(vectors, partitions, 
partitionColumnBitSet, fieldNameMap);
    +
    +        // materialize the expression
    +        logger.debug("Attempting to prune {}", pruneCondition);
    +        final LogicalExpression expr = DrillOptiq.toDrill(new 
DrillParseContext(settings), scanRel, pruneCondition);
    +        final ErrorCollectorImpl errors = new ErrorCollectorImpl();
    +
    +        LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, container, errors, 
optimizerContext.getFunctionRegistry());
    +        // Make sure pruneCondition's materialized expression is always of 
BitType, so that
    +        // it's same as the type of output vector.
    +        if (materializedExpr.getMajorType().getMode() == 
TypeProtos.DataMode.REQUIRED) {
    +          materializedExpr = 
ExpressionTreeMaterializer.convertToNullableType(
    +              materializedExpr,
    +              materializedExpr.getMajorType().getMinorType(),
    +              optimizerContext.getFunctionRegistry(),
    +              errors);
    +        }
     
    -      output.allocateNew(partitions.size());
    -      InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, 
container, output, materializedExpr);
    -      int record = 0;
    +        if (errors.getErrorCount() != 0) {
    +          logger.warn("Failure while materializing expression [{}].  
Errors: {}", expr, errors);
    +        }
     
    -      List<String> newFiles = Lists.newArrayList();
    -      for(PartitionLocation part: partitions){
    -        if(!output.getAccessor().isNull(record) && 
output.getAccessor().get(record) == 1){
    -          newFiles.add(part.getEntirePartitionLocation());
    +        output.allocateNew(partitions.size());
    +        InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, 
container, output, materializedExpr);
    +        int recordCount = 0;
    +        int qualifiedCount = 0;
    +
    +        // Inner loop: within each batch iterate over the 
PartitionLocations
    +        for(PartitionLocation part: partitions){
    +          if(!output.getAccessor().isNull(recordCount) && 
output.getAccessor().get(recordCount) == 1){
    +            newFiles.add(part.getEntirePartitionLocation());
    +            qualifiedCount++;
    +          }
    +          recordCount++;
    +        }
    +        logger.debug("Within batch {}: total records: {}, qualified 
records: {}", batchIndex, recordCount, qualifiedCount);
    +        batchIndex++;
    +      } catch (Exception e) {
    +        logger.warn("Exception while trying to prune partition.", e);
    --- End diff --
    
    If there is Exception during partition pruning for one sublist, seems we 
just log the error as a warning. That means, the code could continue the logic 
after the "for-loop". I feel that might produce incorrect result, since the 
list of new partitions might be invalid. 
    
    Should we stop the execution of partition pruning, once an Exception is 
caught here?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to