Github user jinfengni commented on a diff in the pull request:
https://github.com/apache/drill/pull/906#discussion_r136143881
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
---
@@ -39,88 +35,107 @@
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.resolver.TypeCastRules;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends
AbstractBinaryRecordBatch<UnionAll> {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
- private List<MaterializedField> outputFields;
+ private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private UnionAller unionall;
- private UnionAllInput unionAllInput;
- private RecordBatch current;
-
private final List<TransferPair> transfers = Lists.newArrayList();
- private List<ValueVector> allocationVectors;
- protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private List<ValueVector> allocationVectors = Lists.newArrayList();
private int recordCount = 0;
- private boolean schemaAvailable = false;
+ private UnionInputIterator unionInputIterator;
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children,
FragmentContext context) throws OutOfMemoryException {
- super(config, context, false);
- assert (children.size() == 2) : "The number of the operands of Union
must be 2";
- unionAllInput = new UnionAllInput(this, children.get(0),
children.get(1));
- }
-
- @Override
- public int getRecordCount() {
- return recordCount;
+ super(config, context, true, children.get(0), children.get(1));
}
@Override
protected void killIncoming(boolean sendUpstream) {
- unionAllInput.getLeftRecordBatch().kill(sendUpstream);
- unionAllInput.getRightRecordBatch().kill(sendUpstream);
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not
support selection vector");
- }
+ protected void buildSchema() throws SchemaChangeException {
+ if (! prefetchFirstBatchFromBothSides()) {
+ return;
+ }
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not
support selection vector");
+ unionInputIterator = new UnionInputIterator(leftUpstream, left,
rightUpstream, right);
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream ==
IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide(right.getSchema());
+ } else if (rightUpstream == IterOutcome.NONE && leftUpstream ==
IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide((left.getSchema()));
+ } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream
== IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+ }
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ for (VectorWrapper vv: container) {
+ vv.getValueVector().allocateNew();
+ vv.getValueVector().getMutator().setValueCount(0);
+ }
}
@Override
public IterOutcome innerNext() {
try {
- IterOutcome upstream = unionAllInput.nextBatch();
- logger.debug("Upstream of Union-All: {}", upstream);
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+
+ Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+
+ IterOutcome upstream = nextBatch.left;
+ RecordBatch incoming = nextBatch.right;
+
+ // skip batches with same schema as the previous one yet having 0
row.
+ if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
+ do {
+ for (final VectorWrapper<?> w : incoming) {
+ w.clear();
+ }
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+ nextBatch = unionInputIterator.next();
+ upstream = nextBatch.left;
+ incoming = nextBatch.right;
+ } while ((upstream == IterOutcome.OK) &&
+ incoming.getRecordCount() == 0);
+ }
+
--- End diff --
The loop does not have to handle `STOP` or `OOM` as they are handled in
the `switch` statement. Previously, the loop was intended to handle the same
schema yet having row cases.
The revised patch adopted your suggestion, by putting everything in the
loop.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---