Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/1223#discussion_r182873443
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should
just return the column as is
+public class UnnestRecordBatch extends
AbstractTableFunctionRecordBatch<UnnestPOP> {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+ private Unnest unnest;
+ private boolean hasRemainder = false; // set to true if there is data
left over for the current row AND if we want
+ // to keep processing it. Kill may
be called by a limit in a subquery that
+ // requires us to stop processing
thecurrent row, but not stop processing
+ // the data.
+ // In some cases we need to return a predetermined state from a call to
next. These are:
+ // 1) Kill is called due to an error occurring in the processing of the
query. IterOutcome should be NONE
+ // 2) Kill is called by LIMIT to stop processing of the current row
(This occurs when the LIMIT is part of a subquery
+ // between UNNEST and LATERAL. Iteroutcome should be EMIT
+ // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome
should be NONE
+ private IterOutcome nextState = OK;
+ private int remainderIndex = 0;
+ private int recordCount;
+ private MaterializedField unnestFieldMetadata;
+ private final UnnestMemoryManager memoryManager;
+
+ public enum Metric implements MetricDef {
+ INPUT_BATCH_COUNT,
+ AVG_INPUT_BATCH_BYTES,
+ AVG_INPUT_ROW_BYTES,
+ INPUT_RECORD_COUNT,
+ OUTPUT_BATCH_COUNT,
+ AVG_OUTPUT_BATCH_BYTES,
+ AVG_OUTPUT_ROW_BYTES,
+ OUTPUT_RECORD_COUNT;
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
+ /**
+ * Memory manager for Unnest. Estimates the batch size exactly like we
do for Flatten.
+ */
+ private class UnnestMemoryManager extends RecordBatchMemoryManager {
+
+ private UnnestMemoryManager(int outputBatchSize) {
+ super(outputBatchSize);
+ }
+
+ @Override
+ public void update() {
+ // Get sizing information for the batch.
+ setRecordBatchSizer(new RecordBatchSizer(incoming));
+
+ final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
+ final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+
+ // Get column size of unnest column.
+ RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer
+ .getColumn(incoming.getValueAccessorById(field.getValueClass(),
typedFieldId.getFieldIds()).getValueVector(),
+ field.getName());
+
+ // Average rowWidth of single element in the unnest list.
+ // subtract the offset vector size from column data size.
+ final int avgRowWidthSingleUnnestEntry = RecordBatchSizer
+ .safeDivide(columnSize.getTotalNetSize() -
(getOffsetVectorWidth() * columnSize.getValueCount()), columnSize
+ .getElementCount());
+
+ // Average rowWidth of outgoing batch.
+ final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry;
+
+ // Number of rows in outgoing batch
+ final int outputBatchSize = getOutputBatchSize();
+ // Number of rows in outgoing batch
+ setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
+
+ setOutgoingRowWidth(avgOutgoingRowWidth);
+
+ // Limit to lower bound of total number of rows possible for this
batch
+ // i.e. all rows fit within memory budget.
+ setOutputRowCount(Math.min(columnSize.getElementCount(),
getOutputRowCount()));
+
+ logger.debug("incoming batch size : {}", getRecordBatchSizer());
+
+ logger.debug("output batch size : {}, avg outgoing rowWidth : {},
output rowCount : {}",
+ outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+
+ updateIncomingStats();
+ }
+
+ }
+
+
+ public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws
OutOfMemoryException {
+ super(pop, context);
+ // get the output batch size from config.
+ int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ memoryManager = new UnnestMemoryManager(configuredBatchSize);
+ }
+
+ @Override
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ protected void killIncoming(boolean sendUpstream) {
+ // Kill may be received from an operator downstream of the
corresponding lateral, or from
+ // a limit that is in a subqueruy between unnest and lateral. In the
latter case, unnest has to handle the limit.
+ // In the former case, Lateral will handle most of the kill handling.
+
+ Preconditions.checkNotNull(lateral);
+ // Do not call kill on incoming. Lateral Join has the responsibility
for killing incoming
+ if (context.getExecutorState().isFailed() || lateral.getLeftOutcome()
== IterOutcome.STOP) {
+ logger.debug("Kill received. Stopping all processing");
+ nextState = IterOutcome.NONE ;
+ } else {
+ // if we have already processed the record, then kill from a limit
has no meaning.
+ // if, however, we have values remaining to be emitted, and limit
has been reached,
+ // we abandon the remainder and send an empty batch with EMIT.
+ logger.debug("Kill received from subquery. Stopping processing of
current input row.");
+ if(hasRemainder) {
+ nextState = IterOutcome.EMIT;
+ }
+ }
+ hasRemainder = false; // whatever the case, we need to stop processing
the current row.
+ }
+
+
+ @Override
+ public IterOutcome innerNext() {
+
+ Preconditions.checkNotNull(lateral);
+
+ // Short circuit if record batch has already sent all data and is done
+ if (state == BatchState.DONE) {
+ return IterOutcome.NONE;
+ }
+
+ if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
+ return nextState;
+ }
+
+ if (hasRemainder) {
+ return handleRemainder();
+ }
+
+ // We do not need to call next() unlike the other operators.
+ // When unnest's innerNext is called, the LateralJoin would have
already
+ // updated the incoming vector.
+ // We do, however, need to call doWork() to do the actual work.
+ // We also need to handle schema build if it is the first batch
+
+ if ((state == BatchState.FIRST)) {
+ state = BatchState.NOT_FIRST;
+ try {
+ stats.startSetup();
+ hasRemainder = true; // next call to next will handle the actual
data.
+ logger.debug("First batch received");
+ schemaChanged(); // checks if schema has changed (redundant in
this case becaause it has) AND saves the
+ // current field metadata for check in subsequent
iterations
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ } finally {
+ stats.stopSetup();
+ }
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ assert state != BatchState.FIRST : "First batch should be
OK_NEW_SCHEMA";
+ container.zeroVectors();
+
+ // Check if schema has changed
+ if (lateral.getRecordIndex() == 0 && schemaChanged()) {
+ hasRemainder = true; // next call to next will handle the
actual data.
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ return OK_NEW_SCHEMA;
+ }
+ if (lateral.getRecordIndex() == 0) {
+ unnest.resetGroupIndex();
+ }
+ return doWork();
+ }
+
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ return this.container;
+ }
+
+ @SuppressWarnings("resource") private void setUnnestVector() {
+ final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
+ final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+ final RepeatedValueVector vector;
+ final ValueVector inVV =
+ incoming.getValueAccessorById(field.getValueClass(),
typedFieldId.getFieldIds()).getValueVector();
+
+ if (!(inVV instanceof RepeatedValueVector)) {
+ if (incoming.getRecordCount() != 0) {
+ throw UserException.unsupportedError().message("Unnest does not
support inputs of non-list values.")
+ .build(logger);
+ }
+ // Inherited from FLATTEN. When does this happen???
+ //when incoming recordCount is 0, don't throw exception since the
type being seen here is not solid
+ logger.error("setUnnestVector cast failed and recordcount is 0,
create empty vector anyway.");
+ vector = new RepeatedMapVector(field, oContext.getAllocator(), null);
+ } else {
+ vector = RepeatedValueVector.class.cast(inVV);
+ }
+ unnest.setUnnestField(vector);
+ }
+
+ @Override
+ protected IterOutcome doWork() {
+ Preconditions.checkNotNull(lateral);
+ memoryManager.update();
+ unnest.setOutputCount(memoryManager.getOutputRowCount());
+ final int incomingRecordCount = incoming.getRecordCount();
+ final int currentRecord = lateral.getRecordIndex();
+ // We call this in setupSchema, but we also need to call it here so we
have a reference to the appropriate vector
+ // inside of the the unnest for the current batch
+ setUnnestVector();
+
+ //Expected output count is the num of values in the unnest colum array
for the current record
+ final int childCount =
+ incomingRecordCount == 0 ? 0 :
unnest.getUnnestField().getAccessor().getInnerValueCountAt(currentRecord);
+
+ // Unnest the data
+ final int outputRecords = childCount == 0 ? 0 :
unnest.unnestRecords(childCount);
+
+ logger.debug("{} values out of {} were processed.", outputRecords,
childCount);
+ // Keep track of any spill over into another batch. Happens only if
you artificially set the output batch
+ // size for unnest to a low number
+ if (outputRecords < childCount) {
+ hasRemainder = true;
+ remainderIndex = outputRecords;
+ this.recordCount = remainderIndex;
+ logger.debug("Output spilled into new batch. IterOutcome: OK.");
+ } else {
+ this.recordCount = outputRecords;
+ logger.debug("IterOutcome: EMIT.");
+ }
+
+ memoryManager.updateOutgoingStats(outputRecords);
+ // If the current incoming record has spilled into two batches, we
return
+ // IterOutcome.OK so that the Lateral Join can keep calling next()
until the
+ // entire incoming recods has been unnested. If the entire records has
been
+ // unnested, we return EMIT and any blocking operators in the pipeline
will
+ // unblock.
+ return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
+ }
+
+ private IterOutcome handleRemainder() {
+ Preconditions.checkNotNull(lateral);
+ memoryManager.update();
+ unnest.setOutputCount(memoryManager.getOutputRowCount());
+ final int currentRecord = lateral.getRecordIndex();
+ final int remainingRecordCount =
+
unnest.getUnnestField().getAccessor().getInnerValueCountAt(currentRecord) -
remainderIndex;
+ final int outputRecords = unnest.unnestRecords(remainingRecordCount);
+ logger.debug("{} values out of {} were processed.", outputRecords,
remainingRecordCount);
+ if (outputRecords < remainingRecordCount) {
+ this.recordCount = outputRecords;
+ this.remainderIndex += outputRecords;
+ logger.debug("Output spilled into new batch. IterOutcome: OK.");
+ } else {
+ this.hasRemainder = false;
+ this.remainderIndex = 0;
+ this.recordCount = remainingRecordCount;
+ logger.debug("IterOutcome: EMIT.");
+ }
+ memoryManager.updateOutgoingStats(outputRecords);
+ return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
+ }
+
+ /**
+ * The data layout is the same for the actual data within a repeated
field, as it is in a scalar vector for
+ * the same sql type. For example, a repeated int vector has a vector of
offsets into a regular int vector to
+ * represent the lists. As the data layout for the actual values in the
same in the repeated vector as in the
+ * scalar vector of the same type, we can avoid making individual copies
for the column being unnested, and just
+ * use vector copies between the inner vector of the repeated field to
the resulting scalar vector from the unnest
+ * operation. This is completed after we determine how many records will
fit (as we will hit either a batch end, or
+ * the end of one of the other vectors while we are copying the data of
the other vectors alongside each new unnested
+ * value coming out of the repeated field.)
+ */
+ @SuppressWarnings("resource") private TransferPair
getUnnestFieldTransferPair(FieldReference reference) {
+ final TypedFieldId fieldId =
incoming.getValueVectorId(popConfig.getColumn());
+ final Class<?> vectorClass =
incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
+ final ValueVector unnestField =
incoming.getValueAccessorById(vectorClass,
fieldId.getFieldIds()).getValueVector();
+
+ TransferPair tp = null;
+ if (unnestField instanceof RepeatedMapVector) {
+ tp = ((RepeatedMapVector) unnestField)
+ .getTransferPairToSingleMap(reference.getAsNamePart().getName(),
oContext.getAllocator());
+ } else if (!(unnestField instanceof RepeatedValueVector)) {
+ if (incoming.getRecordCount() != 0) {
+ throw UserException.unsupportedError().message("Unnest does not
support inputs of non-list values.")
+ .build(logger);
+ }
+ logger.error("Cannot cast {} to RepeatedValueVector", unnestField);
+ //when incoming recordCount is 0, don't throw exception since the
type being seen here is not solid
+ final ValueVector vv = new RepeatedMapVector(unnestField.getField(),
oContext.getAllocator(), null);
+ tp = RepeatedValueVector.class.cast(vv)
+ .getTransferPair(reference.getAsNamePart().getName(),
oContext.getAllocator());
+ } else {
+ final ValueVector vvIn =
RepeatedValueVector.class.cast(unnestField).getDataVector();
+ // vvIn may be null because of fast schema return for repeated list
vectors
+ if (vvIn != null) {
+ tp = vvIn.getTransferPair(reference.getAsNamePart().getName(),
oContext.getAllocator());
+ }
+ }
+ return tp;
+ }
+
+ @Override protected boolean setupNewSchema() throws
SchemaChangeException {
+ Preconditions.checkNotNull(lateral);
--- End diff --
This check is not needed here since it's already being done inside
`innerNext()`
---