[ https://issues.apache.org/jira/browse/DRILL-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142430#comment-16142430 ]
ASF GitHub Bot commented on DRILL-5546: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/906#discussion_r135369225 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java --- @@ -39,88 +35,107 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.UnionAll; -import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.resolver.TypeCastRules; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Stack; -public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { +public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class); - private List<MaterializedField> outputFields; + private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private UnionAller unionall; - private UnionAllInput unionAllInput; - private RecordBatch current; - private final List<TransferPair> transfers = Lists.newArrayList(); - private List<ValueVector> allocationVectors; - protected SchemaChangeCallBack callBack = new SchemaChangeCallBack(); + private List<ValueVector> allocationVectors = Lists.newArrayList(); private int recordCount = 0; - private boolean schemaAvailable = false; + private UnionInputIterator unionInputIterator; public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException { - super(config, context, false); - assert (children.size() == 2) : "The number of the operands of Union must be 2"; - unionAllInput = new UnionAllInput(this, children.get(0), children.get(1)); - } - - @Override - public int getRecordCount() { - return recordCount; + super(config, context, true, children.get(0), children.get(1)); } @Override protected void killIncoming(boolean sendUpstream) { - unionAllInput.getLeftRecordBatch().kill(sendUpstream); - unionAllInput.getRightRecordBatch().kill(sendUpstream); + left.kill(sendUpstream); + right.kill(sendUpstream); } - @Override - public SelectionVector2 getSelectionVector2() { - throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector"); - } + protected void buildSchema() throws SchemaChangeException { + if (! prefetchFirstBatchFromBothSides()) { + return; + } - @Override - public SelectionVector4 getSelectionVector4() { - throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector"); + unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right); + + if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsOneSide(right.getSchema()); + } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsOneSide((left.getSchema())); + } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) { + inferOutputFieldsBothSide(left.getSchema(), right.getSchema()); + } + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + for (VectorWrapper vv: container) { + vv.getValueVector().allocateNew(); + vv.getValueVector().getMutator().setValueCount(0); + } } @Override public IterOutcome innerNext() { try { - IterOutcome upstream = unionAllInput.nextBatch(); - logger.debug("Upstream of Union-All: {}", upstream); + if (!unionInputIterator.hasNext()) { + return IterOutcome.NONE; + } + + Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next(); + + IterOutcome upstream = nextBatch.left; + RecordBatch incoming = nextBatch.right; + + // skip batches with same schema as the previous one yet having 0 row. + if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { + do { + for (final VectorWrapper<?> w : incoming) { + w.clear(); + } --- End diff -- `VectorAccessibleUtilities.clear(incoming)` > Schema change problems caused by empty batch > -------------------------------------------- > > Key: DRILL-5546 > URL: https://issues.apache.org/jira/browse/DRILL-5546 > Project: Apache Drill > Issue Type: Bug > Reporter: Jinfeng Ni > Assignee: Jinfeng Ni > > There have been a few JIRAs opened related to schema change failure caused by > empty batch. This JIRA is opened as an umbrella for all those related JIRAS ( > such as DRILL-4686, DRILL-4734, DRILL4476, DRILL-4255, etc). > -- This message was sent by Atlassian JIRA (v6.4.14#64029)