[ 
https://issues.apache.org/jira/browse/DRILL-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147691#comment-16147691
 ] 

ASF GitHub Bot commented on DRILL-5546:
---------------------------------------

Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/906#discussion_r136143881
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
    @@ -39,88 +35,107 @@
     import org.apache.drill.exec.expr.ValueVectorWriteExpression;
     import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.config.UnionAll;
    -import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
     import org.apache.drill.exec.record.BatchSchema;
     import org.apache.drill.exec.record.MaterializedField;
     import org.apache.drill.exec.record.RecordBatch;
     import org.apache.drill.exec.record.TransferPair;
     import org.apache.drill.exec.record.TypedFieldId;
     import org.apache.drill.exec.record.VectorWrapper;
    -import org.apache.drill.exec.record.WritableBatch;
    -import org.apache.drill.exec.record.selection.SelectionVector2;
    -import org.apache.drill.exec.record.selection.SelectionVector4;
     import org.apache.drill.exec.resolver.TypeCastRules;
     import org.apache.drill.exec.vector.AllocationHelper;
     import org.apache.drill.exec.vector.FixedWidthVector;
     import org.apache.drill.exec.vector.SchemaChangeCallBack;
     import org.apache.drill.exec.vector.ValueVector;
     
    -import com.google.common.collect.Lists;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Stack;
     
    -public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
    +public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
       private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
     
    -  private List<MaterializedField> outputFields;
    +  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
       private UnionAller unionall;
    -  private UnionAllInput unionAllInput;
    -  private RecordBatch current;
    -
       private final List<TransferPair> transfers = Lists.newArrayList();
    -  private List<ValueVector> allocationVectors;
    -  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
    +  private List<ValueVector> allocationVectors = Lists.newArrayList();
       private int recordCount = 0;
    -  private boolean schemaAvailable = false;
    +  private UnionInputIterator unionInputIterator;
     
       public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, 
FragmentContext context) throws OutOfMemoryException {
    -    super(config, context, false);
    -    assert (children.size() == 2) : "The number of the operands of Union 
must be 2";
    -    unionAllInput = new UnionAllInput(this, children.get(0), 
children.get(1));
    -  }
    -
    -  @Override
    -  public int getRecordCount() {
    -    return recordCount;
    +    super(config, context, true, children.get(0), children.get(1));
       }
     
       @Override
       protected void killIncoming(boolean sendUpstream) {
    -    unionAllInput.getLeftRecordBatch().kill(sendUpstream);
    -    unionAllInput.getRightRecordBatch().kill(sendUpstream);
    +    left.kill(sendUpstream);
    +    right.kill(sendUpstream);
       }
     
    -  @Override
    -  public SelectionVector2 getSelectionVector2() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
    -  }
    +  protected void buildSchema() throws SchemaChangeException {
    +    if (! prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
     
    -  @Override
    -  public SelectionVector4 getSelectionVector4() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
    +    unionInputIterator = new UnionInputIterator(leftUpstream, left, 
rightUpstream, right);
    +
    +    if (leftUpstream == IterOutcome.NONE && rightUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsOneSide(right.getSchema());
    +    } else if (rightUpstream == IterOutcome.NONE && leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsOneSide((left.getSchema()));
    +    } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream 
== IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
    +    }
    +
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    for (VectorWrapper vv: container) {
    +      vv.getValueVector().allocateNew();
    +      vv.getValueVector().getMutator().setValueCount(0);
    +    }
       }
     
       @Override
       public IterOutcome innerNext() {
         try {
    -      IterOutcome upstream = unionAllInput.nextBatch();
    -      logger.debug("Upstream of Union-All: {}", upstream);
    +      if (!unionInputIterator.hasNext()) {
    +        return IterOutcome.NONE;
    +      }
    +
    +      Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
    +
    +      IterOutcome upstream = nextBatch.left;
    +      RecordBatch incoming = nextBatch.right;
    +
    +      // skip batches with same schema as the previous one yet having 0 
row.
    +      if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
    +        do {
    +          for (final VectorWrapper<?> w : incoming) {
    +            w.clear();
    +          }
    +          if (!unionInputIterator.hasNext()) {
    +            return IterOutcome.NONE;
    +          }
    +          nextBatch = unionInputIterator.next();
    +          upstream = nextBatch.left;
    +          incoming = nextBatch.right;
    +        } while ((upstream == IterOutcome.OK) &&
    +                incoming.getRecordCount() == 0);
    +      }
    +
    --- End diff --
    
    The  loop does not have to handle `STOP` or `OOM` as they are handled in 
the `switch` statement.  Previously, the loop was intended to handle the same 
schema yet having row cases.
    
    The revised patch adopted your suggestion, by putting everything in the 
loop. 



> Schema change problems caused by empty batch
> --------------------------------------------
>
>                 Key: DRILL-5546
>                 URL: https://issues.apache.org/jira/browse/DRILL-5546
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Jinfeng Ni
>            Assignee: Jinfeng Ni
>
> There have been a few JIRAs opened related to schema change failure caused by 
> empty batch. This JIRA is opened as an umbrella for all those related JIRAS ( 
> such as DRILL-4686, DRILL-4734, DRILL4476, DRILL-4255, etc).
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to