Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/1223#discussion_r183110851
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
---
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unnest;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+// TODO - handle the case where a user tries to unnest a scalar, should
just return the column as is
+public class UnnestRecordBatch extends
AbstractTableFunctionRecordBatch<UnnestPOP> {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+
+ private Unnest unnest;
+ private boolean hasRemainder = false; // set to true if there is data
left over for the current row AND if we want
+ // to keep processing it. Kill may
be called by a limit in a subquery that
+ // requires us to stop processing
thecurrent row, but not stop processing
+ // the data.
+ // In some cases we need to return a predetermined state from a call to
next. These are:
+ // 1) Kill is called due to an error occurring in the processing of the
query. IterOutcome should be NONE
+ // 2) Kill is called by LIMIT to stop processing of the current row
(This occurs when the LIMIT is part of a subquery
+ // between UNNEST and LATERAL. Iteroutcome should be EMIT
+ // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome
should be NONE
+ private IterOutcome nextState = OK;
+ private int remainderIndex = 0;
+ private int recordCount;
+ private MaterializedField unnestFieldMetadata;
+ private final UnnestMemoryManager memoryManager;
+
+ public enum Metric implements MetricDef {
+ INPUT_BATCH_COUNT,
+ AVG_INPUT_BATCH_BYTES,
+ AVG_INPUT_ROW_BYTES,
+ INPUT_RECORD_COUNT,
+ OUTPUT_BATCH_COUNT,
+ AVG_OUTPUT_BATCH_BYTES,
+ AVG_OUTPUT_ROW_BYTES,
+ OUTPUT_RECORD_COUNT;
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
+ /**
+ * Memory manager for Unnest. Estimates the batch size exactly like we
do for Flatten.
+ */
+ private class UnnestMemoryManager extends RecordBatchMemoryManager {
+
+ private UnnestMemoryManager(int outputBatchSize) {
+ super(outputBatchSize);
+ }
+
+ @Override
+ public void update() {
+ // Get sizing information for the batch.
+ setRecordBatchSizer(new RecordBatchSizer(incoming));
+
+ final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
+ final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+
+ // Get column size of unnest column.
+ RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer
+ .getColumn(incoming.getValueAccessorById(field.getValueClass(),
typedFieldId.getFieldIds()).getValueVector(),
+ field.getName());
+
+ // Average rowWidth of single element in the unnest list.
+ // subtract the offset vector size from column data size.
+ final int avgRowWidthSingleUnnestEntry = RecordBatchSizer
+ .safeDivide(columnSize.getTotalNetSize() -
(getOffsetVectorWidth() * columnSize.getValueCount()), columnSize
+ .getElementCount());
+
+ // Average rowWidth of outgoing batch.
+ final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry;
+
+ // Number of rows in outgoing batch
+ final int outputBatchSize = getOutputBatchSize();
+ // Number of rows in outgoing batch
+ setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
+
+ setOutgoingRowWidth(avgOutgoingRowWidth);
+
+ // Limit to lower bound of total number of rows possible for this
batch
+ // i.e. all rows fit within memory budget.
+ setOutputRowCount(Math.min(columnSize.getElementCount(),
getOutputRowCount()));
+
+ logger.debug("incoming batch size : {}", getRecordBatchSizer());
+
+ logger.debug("output batch size : {}, avg outgoing rowWidth : {},
output rowCount : {}",
+ outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+
+ updateIncomingStats();
+ }
+
+ }
+
+
+ public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws
OutOfMemoryException {
+ super(pop, context);
+ // get the output batch size from config.
+ int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ memoryManager = new UnnestMemoryManager(configuredBatchSize);
+ }
+
+ @Override
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ protected void killIncoming(boolean sendUpstream) {
+ // Kill may be received from an operator downstream of the
corresponding lateral, or from
+ // a limit that is in a subqueruy between unnest and lateral. In the
latter case, unnest has to handle the limit.
+ // In the former case, Lateral will handle most of the kill handling.
+
+ Preconditions.checkNotNull(lateral);
+ // Do not call kill on incoming. Lateral Join has the responsibility
for killing incoming
+ if (context.getExecutorState().isFailed() || lateral.getLeftOutcome()
== IterOutcome.STOP) {
+ logger.debug("Kill received. Stopping all processing");
+ nextState = IterOutcome.NONE ;
+ } else {
+ // if we have already processed the record, then kill from a limit
has no meaning.
+ // if, however, we have values remaining to be emitted, and limit
has been reached,
+ // we abandon the remainder and send an empty batch with EMIT.
+ logger.debug("Kill received from subquery. Stopping processing of
current input row.");
+ if(hasRemainder) {
+ nextState = IterOutcome.EMIT;
+ }
+ }
+ hasRemainder = false; // whatever the case, we need to stop processing
the current row.
+ }
+
+
+ @Override
+ public IterOutcome innerNext() {
+
+ Preconditions.checkNotNull(lateral);
+
+ // Short circuit if record batch has already sent all data and is done
+ if (state == BatchState.DONE) {
+ return IterOutcome.NONE;
+ }
+
+ if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
+ return nextState;
+ }
+
+ if (hasRemainder) {
+ return handleRemainder();
+ }
+
+ // We do not need to call next() unlike the other operators.
+ // When unnest's innerNext is called, the LateralJoin would have
already
+ // updated the incoming vector.
+ // We do, however, need to call doWork() to do the actual work.
+ // We also need to handle schema build if it is the first batch
+
+ if ((state == BatchState.FIRST)) {
+ state = BatchState.NOT_FIRST;
+ try {
+ stats.startSetup();
+ hasRemainder = true; // next call to next will handle the actual
data.
+ logger.debug("First batch received");
+ schemaChanged(); // checks if schema has changed (redundant in
this case becaause it has) AND saves the
+ // current field metadata for check in subsequent
iterations
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ } finally {
+ stats.stopSetup();
+ }
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ assert state != BatchState.FIRST : "First batch should be
OK_NEW_SCHEMA";
+ container.zeroVectors();
+
+ // Check if schema has changed
+ if (lateral.getRecordIndex() == 0 && schemaChanged()) {
+ hasRemainder = true; // next call to next will handle the
actual data.
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ return OK_NEW_SCHEMA;
+ }
+ if (lateral.getRecordIndex() == 0) {
+ unnest.resetGroupIndex();
+ }
+ return doWork();
+ }
+
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ return this.container;
+ }
+
+ @SuppressWarnings("resource") private void setUnnestVector() {
--- End diff --
Please refactor `setUnnestVector` and `getUnnestFieldTransferPair` into a
single method.
---