Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1223#discussion_r183110851 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java --- @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unnest; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.config.UnnestPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; +import org.apache.drill.exec.vector.complex.RepeatedValueVector; + +import java.io.IOException; +import java.util.List; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + +// TODO - handle the case where a user tries to unnest a scalar, should just return the column as is +public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class); + + private Unnest unnest; + private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want + // to keep processing it. Kill may be called by a limit in a subquery that + // requires us to stop processing thecurrent row, but not stop processing + // the data. + // In some cases we need to return a predetermined state from a call to next. These are: + // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE + // 2) Kill is called by LIMIT to stop processing of the current row (This occurs when the LIMIT is part of a subquery + // between UNNEST and LATERAL. Iteroutcome should be EMIT + // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE + private IterOutcome nextState = OK; + private int remainderIndex = 0; + private int recordCount; + private MaterializedField unnestFieldMetadata; + private final UnnestMemoryManager memoryManager; + + public enum Metric implements MetricDef { + INPUT_BATCH_COUNT, + AVG_INPUT_BATCH_BYTES, + AVG_INPUT_ROW_BYTES, + INPUT_RECORD_COUNT, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + OUTPUT_RECORD_COUNT; + + @Override + public int metricId() { + return ordinal(); + } + } + + /** + * Memory manager for Unnest. Estimates the batch size exactly like we do for Flatten. + */ + private class UnnestMemoryManager extends RecordBatchMemoryManager { + + private UnnestMemoryManager(int outputBatchSize) { + super(outputBatchSize); + } + + @Override + public void update() { + // Get sizing information for the batch. + setRecordBatchSizer(new RecordBatchSizer(incoming)); + + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + + // Get column size of unnest column. + RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer + .getColumn(incoming.getValueAccessorById(field.getValueClass(), typedFieldId.getFieldIds()).getValueVector(), + field.getName()); + + // Average rowWidth of single element in the unnest list. + // subtract the offset vector size from column data size. + final int avgRowWidthSingleUnnestEntry = RecordBatchSizer + .safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()), columnSize + .getElementCount()); + + // Average rowWidth of outgoing batch. + final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry; + + // Number of rows in outgoing batch + final int outputBatchSize = getOutputBatchSize(); + // Number of rows in outgoing batch + setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); + + setOutgoingRowWidth(avgOutgoingRowWidth); + + // Limit to lower bound of total number of rows possible for this batch + // i.e. all rows fit within memory budget. + setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); + + logger.debug("incoming batch size : {}", getRecordBatchSizer()); + + logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", + outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); + + updateIncomingStats(); + } + + } + + + public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMemoryException { + super(pop, context); + // get the output batch size from config. + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + memoryManager = new UnnestMemoryManager(configuredBatchSize); + } + + @Override + public int getRecordCount() { + return recordCount; + } + + protected void killIncoming(boolean sendUpstream) { + // Kill may be received from an operator downstream of the corresponding lateral, or from + // a limit that is in a subqueruy between unnest and lateral. In the latter case, unnest has to handle the limit. + // In the former case, Lateral will handle most of the kill handling. + + Preconditions.checkNotNull(lateral); + // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming + if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == IterOutcome.STOP) { + logger.debug("Kill received. Stopping all processing"); + nextState = IterOutcome.NONE ; + } else { + // if we have already processed the record, then kill from a limit has no meaning. + // if, however, we have values remaining to be emitted, and limit has been reached, + // we abandon the remainder and send an empty batch with EMIT. + logger.debug("Kill received from subquery. Stopping processing of current input row."); + if(hasRemainder) { + nextState = IterOutcome.EMIT; + } + } + hasRemainder = false; // whatever the case, we need to stop processing the current row. + } + + + @Override + public IterOutcome innerNext() { + + Preconditions.checkNotNull(lateral); + + // Short circuit if record batch has already sent all data and is done + if (state == BatchState.DONE) { + return IterOutcome.NONE; + } + + if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) { + return nextState; + } + + if (hasRemainder) { + return handleRemainder(); + } + + // We do not need to call next() unlike the other operators. + // When unnest's innerNext is called, the LateralJoin would have already + // updated the incoming vector. + // We do, however, need to call doWork() to do the actual work. + // We also need to handle schema build if it is the first batch + + if ((state == BatchState.FIRST)) { + state = BatchState.NOT_FIRST; + try { + stats.startSetup(); + hasRemainder = true; // next call to next will handle the actual data. + logger.debug("First batch received"); + schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the + // current field metadata for check in subsequent iterations + setupNewSchema(); + } catch (SchemaChangeException ex) { + kill(false); + logger.error("Failure during query", ex); + context.getExecutorState().fail(ex); + return IterOutcome.STOP; + } finally { + stats.stopSetup(); + } + return IterOutcome.OK_NEW_SCHEMA; + } else { + assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA"; + container.zeroVectors(); + + // Check if schema has changed + if (lateral.getRecordIndex() == 0 && schemaChanged()) { + hasRemainder = true; // next call to next will handle the actual data. + try { + setupNewSchema(); + } catch (SchemaChangeException ex) { + kill(false); + logger.error("Failure during query", ex); + context.getExecutorState().fail(ex); + return IterOutcome.STOP; + } + return OK_NEW_SCHEMA; + } + if (lateral.getRecordIndex() == 0) { + unnest.resetGroupIndex(); + } + return doWork(); + } + + } + + @Override + public VectorContainer getOutgoingContainer() { + return this.container; + } + + @SuppressWarnings("resource") private void setUnnestVector() { --- End diff -- Please refactor `setUnnestVector` and `getUnnestFieldTransferPair` into a single method.
---