Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/906#discussion_r135368337
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
---
@@ -39,88 +35,107 @@
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.resolver.TypeCastRules;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends
AbstractBinaryRecordBatch<UnionAll> {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
- private List<MaterializedField> outputFields;
+ private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private UnionAller unionall;
- private UnionAllInput unionAllInput;
- private RecordBatch current;
-
private final List<TransferPair> transfers = Lists.newArrayList();
- private List<ValueVector> allocationVectors;
- protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private List<ValueVector> allocationVectors = Lists.newArrayList();
private int recordCount = 0;
- private boolean schemaAvailable = false;
+ private UnionInputIterator unionInputIterator;
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children,
FragmentContext context) throws OutOfMemoryException {
- super(config, context, false);
- assert (children.size() == 2) : "The number of the operands of Union
must be 2";
- unionAllInput = new UnionAllInput(this, children.get(0),
children.get(1));
- }
-
- @Override
- public int getRecordCount() {
- return recordCount;
+ super(config, context, true, children.get(0), children.get(1));
}
@Override
protected void killIncoming(boolean sendUpstream) {
- unionAllInput.getLeftRecordBatch().kill(sendUpstream);
- unionAllInput.getRightRecordBatch().kill(sendUpstream);
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not
support selection vector");
- }
+ protected void buildSchema() throws SchemaChangeException {
+ if (! prefetchFirstBatchFromBothSides()) {
+ return;
+ }
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not
support selection vector");
+ unionInputIterator = new UnionInputIterator(leftUpstream, left,
rightUpstream, right);
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream ==
IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide(right.getSchema());
+ } else if (rightUpstream == IterOutcome.NONE && leftUpstream ==
IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide((left.getSchema()));
+ } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream
== IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+ }
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ for (VectorWrapper vv: container) {
+ vv.getValueVector().allocateNew();
+ vv.getValueVector().getMutator().setValueCount(0);
+ }
}
@Override
public IterOutcome innerNext() {
try {
- IterOutcome upstream = unionAllInput.nextBatch();
- logger.debug("Upstream of Union-All: {}", upstream);
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+
+ Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+
+ IterOutcome upstream = nextBatch.left;
+ RecordBatch incoming = nextBatch.right;
+
+ // skip batches with same schema as the previous one yet having 0
row.
+ if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
+ do {
+ for (final VectorWrapper<?> w : incoming) {
+ w.clear();
+ }
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+ nextBatch = unionInputIterator.next();
+ upstream = nextBatch.left;
+ incoming = nextBatch.right;
+ } while ((upstream == IterOutcome.OK) &&
+ incoming.getRecordCount() == 0);
+ }
+
--- End diff --
Loop does not handle `STOP` or `OOM` conditions.
The loop can be compressed to something like:
```
IterOutcome upstream;
RecordBatch incoming;
for (;;) {
// Another input batch?
if (!unionInputIterator.hasNext()) {
return IterOutcome.NONE;
}
IterOutcome upstream = nextBatch.left;
RecordBatch incoming = nextBatch.right;
// Handle error conditions
switch(upstream) {
case NONE: // Can this actually occur?
case OUT_OF_MEMORY:
case STOP:
return upstream;
}
// skip batches with same schema as the previous one yet having 0 row.
if (upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
VectorAccessibleUtilities.clear(incoming);
continue;
}
// Have a batch
break;
}
```
The above can be simplified a bit, but should convey the idea.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---