Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/906#discussion_r135369225
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 ---
    @@ -39,88 +35,107 @@
     import org.apache.drill.exec.expr.ValueVectorWriteExpression;
     import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.config.UnionAll;
    -import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
     import org.apache.drill.exec.record.BatchSchema;
     import org.apache.drill.exec.record.MaterializedField;
     import org.apache.drill.exec.record.RecordBatch;
     import org.apache.drill.exec.record.TransferPair;
     import org.apache.drill.exec.record.TypedFieldId;
     import org.apache.drill.exec.record.VectorWrapper;
    -import org.apache.drill.exec.record.WritableBatch;
    -import org.apache.drill.exec.record.selection.SelectionVector2;
    -import org.apache.drill.exec.record.selection.SelectionVector4;
     import org.apache.drill.exec.resolver.TypeCastRules;
     import org.apache.drill.exec.vector.AllocationHelper;
     import org.apache.drill.exec.vector.FixedWidthVector;
     import org.apache.drill.exec.vector.SchemaChangeCallBack;
     import org.apache.drill.exec.vector.ValueVector;
     
    -import com.google.common.collect.Lists;
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Stack;
     
    -public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
    +public class UnionAllRecordBatch extends 
AbstractBinaryRecordBatch<UnionAll> {
       private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
     
    -  private List<MaterializedField> outputFields;
    +  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
       private UnionAller unionall;
    -  private UnionAllInput unionAllInput;
    -  private RecordBatch current;
    -
       private final List<TransferPair> transfers = Lists.newArrayList();
    -  private List<ValueVector> allocationVectors;
    -  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
    +  private List<ValueVector> allocationVectors = Lists.newArrayList();
       private int recordCount = 0;
    -  private boolean schemaAvailable = false;
    +  private UnionInputIterator unionInputIterator;
     
       public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, 
FragmentContext context) throws OutOfMemoryException {
    -    super(config, context, false);
    -    assert (children.size() == 2) : "The number of the operands of Union 
must be 2";
    -    unionAllInput = new UnionAllInput(this, children.get(0), 
children.get(1));
    -  }
    -
    -  @Override
    -  public int getRecordCount() {
    -    return recordCount;
    +    super(config, context, true, children.get(0), children.get(1));
       }
     
       @Override
       protected void killIncoming(boolean sendUpstream) {
    -    unionAllInput.getLeftRecordBatch().kill(sendUpstream);
    -    unionAllInput.getRightRecordBatch().kill(sendUpstream);
    +    left.kill(sendUpstream);
    +    right.kill(sendUpstream);
       }
     
    -  @Override
    -  public SelectionVector2 getSelectionVector2() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
    -  }
    +  protected void buildSchema() throws SchemaChangeException {
    +    if (! prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
     
    -  @Override
    -  public SelectionVector4 getSelectionVector4() {
    -    throw new UnsupportedOperationException("UnionAllRecordBatch does not 
support selection vector");
    +    unionInputIterator = new UnionInputIterator(leftUpstream, left, 
rightUpstream, right);
    +
    +    if (leftUpstream == IterOutcome.NONE && rightUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsOneSide(right.getSchema());
    +    } else if (rightUpstream == IterOutcome.NONE && leftUpstream == 
IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsOneSide((left.getSchema()));
    +    } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream 
== IterOutcome.OK_NEW_SCHEMA) {
    +      inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
    +    }
    +
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    for (VectorWrapper vv: container) {
    +      vv.getValueVector().allocateNew();
    +      vv.getValueVector().getMutator().setValueCount(0);
    +    }
       }
     
       @Override
       public IterOutcome innerNext() {
         try {
    -      IterOutcome upstream = unionAllInput.nextBatch();
    -      logger.debug("Upstream of Union-All: {}", upstream);
    +      if (!unionInputIterator.hasNext()) {
    +        return IterOutcome.NONE;
    +      }
    +
    +      Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
    +
    +      IterOutcome upstream = nextBatch.left;
    +      RecordBatch incoming = nextBatch.right;
    +
    +      // skip batches with same schema as the previous one yet having 0 
row.
    +      if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
    +        do {
    +          for (final VectorWrapper<?> w : incoming) {
    +            w.clear();
    +          }
    --- End diff --
    
    `VectorAccessibleUtilities.clear(incoming)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to