Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/1223#discussion_r182873668 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java --- @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unnest; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.config.UnnestPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; +import org.apache.drill.exec.vector.complex.RepeatedValueVector; + +import java.io.IOException; +import java.util.List; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + +// TODO - handle the case where a user tries to unnest a scalar, should just return the column as is +public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class); + + private Unnest unnest; + private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want + // to keep processing it. Kill may be called by a limit in a subquery that + // requires us to stop processing thecurrent row, but not stop processing + // the data. + // In some cases we need to return a predetermined state from a call to next. These are: + // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE + // 2) Kill is called by LIMIT to stop processing of the current row (This occurs when the LIMIT is part of a subquery + // between UNNEST and LATERAL. Iteroutcome should be EMIT + // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE + private IterOutcome nextState = OK; + private int remainderIndex = 0; + private int recordCount; + private MaterializedField unnestFieldMetadata; + private final UnnestMemoryManager memoryManager; + + public enum Metric implements MetricDef { + INPUT_BATCH_COUNT, + AVG_INPUT_BATCH_BYTES, + AVG_INPUT_ROW_BYTES, + INPUT_RECORD_COUNT, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + OUTPUT_RECORD_COUNT; + + @Override + public int metricId() { + return ordinal(); + } + } + + /** + * Memory manager for Unnest. Estimates the batch size exactly like we do for Flatten. + */ + private class UnnestMemoryManager extends RecordBatchMemoryManager { + + private UnnestMemoryManager(int outputBatchSize) { + super(outputBatchSize); + } + + @Override + public void update() { + // Get sizing information for the batch. + setRecordBatchSizer(new RecordBatchSizer(incoming)); + + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + + // Get column size of unnest column. + RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer + .getColumn(incoming.getValueAccessorById(field.getValueClass(), typedFieldId.getFieldIds()).getValueVector(), + field.getName()); + + // Average rowWidth of single element in the unnest list. + // subtract the offset vector size from column data size. + final int avgRowWidthSingleUnnestEntry = RecordBatchSizer + .safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()), columnSize + .getElementCount()); + + // Average rowWidth of outgoing batch. + final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry; + + // Number of rows in outgoing batch + final int outputBatchSize = getOutputBatchSize(); + // Number of rows in outgoing batch + setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); + + setOutgoingRowWidth(avgOutgoingRowWidth); + + // Limit to lower bound of total number of rows possible for this batch + // i.e. all rows fit within memory budget. + setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); + + logger.debug("incoming batch size : {}", getRecordBatchSizer()); + + logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", + outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); + + updateIncomingStats(); + } + + } + + + public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMemoryException { + super(pop, context); + // get the output batch size from config. + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + memoryManager = new UnnestMemoryManager(configuredBatchSize); + } + + @Override + public int getRecordCount() { + return recordCount; + } + + protected void killIncoming(boolean sendUpstream) { + // Kill may be received from an operator downstream of the corresponding lateral, or from + // a limit that is in a subqueruy between unnest and lateral. In the latter case, unnest has to handle the limit. + // In the former case, Lateral will handle most of the kill handling. + + Preconditions.checkNotNull(lateral); + // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming + if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == IterOutcome.STOP) { + logger.debug("Kill received. Stopping all processing"); + nextState = IterOutcome.NONE ; + } else { + // if we have already processed the record, then kill from a limit has no meaning. + // if, however, we have values remaining to be emitted, and limit has been reached, + // we abandon the remainder and send an empty batch with EMIT. + logger.debug("Kill received from subquery. Stopping processing of current input row."); + if(hasRemainder) { + nextState = IterOutcome.EMIT; + } + } + hasRemainder = false; // whatever the case, we need to stop processing the current row. + } + + + @Override + public IterOutcome innerNext() { + + Preconditions.checkNotNull(lateral); + + // Short circuit if record batch has already sent all data and is done + if (state == BatchState.DONE) { + return IterOutcome.NONE; + } + + if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) { + return nextState; + } + + if (hasRemainder) { + return handleRemainder(); + } + + // We do not need to call next() unlike the other operators. + // When unnest's innerNext is called, the LateralJoin would have already + // updated the incoming vector. + // We do, however, need to call doWork() to do the actual work. + // We also need to handle schema build if it is the first batch + + if ((state == BatchState.FIRST)) { + state = BatchState.NOT_FIRST; + try { + stats.startSetup(); + hasRemainder = true; // next call to next will handle the actual data. + logger.debug("First batch received"); + schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the + // current field metadata for check in subsequent iterations + setupNewSchema(); + } catch (SchemaChangeException ex) { + kill(false); + logger.error("Failure during query", ex); + context.getExecutorState().fail(ex); + return IterOutcome.STOP; + } finally { + stats.stopSetup(); + } + return IterOutcome.OK_NEW_SCHEMA; + } else { + assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA"; + container.zeroVectors(); + + // Check if schema has changed + if (lateral.getRecordIndex() == 0 && schemaChanged()) { + hasRemainder = true; // next call to next will handle the actual data. + try { + setupNewSchema(); + } catch (SchemaChangeException ex) { + kill(false); + logger.error("Failure during query", ex); + context.getExecutorState().fail(ex); + return IterOutcome.STOP; + } + return OK_NEW_SCHEMA; + } + if (lateral.getRecordIndex() == 0) { + unnest.resetGroupIndex(); + } + return doWork(); + } + + } + + @Override + public VectorContainer getOutgoingContainer() { + return this.container; + } + + @SuppressWarnings("resource") private void setUnnestVector() { + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + final RepeatedValueVector vector; + final ValueVector inVV = + incoming.getValueAccessorById(field.getValueClass(), typedFieldId.getFieldIds()).getValueVector(); + + if (!(inVV instanceof RepeatedValueVector)) { + if (incoming.getRecordCount() != 0) { + throw UserException.unsupportedError().message("Unnest does not support inputs of non-list values.") + .build(logger); + } + // Inherited from FLATTEN. When does this happen??? + //when incoming recordCount is 0, don't throw exception since the type being seen here is not solid + logger.error("setUnnestVector cast failed and recordcount is 0, create empty vector anyway."); + vector = new RepeatedMapVector(field, oContext.getAllocator(), null); + } else { + vector = RepeatedValueVector.class.cast(inVV); + } + unnest.setUnnestField(vector); + } + + @Override + protected IterOutcome doWork() { + Preconditions.checkNotNull(lateral); + memoryManager.update(); + unnest.setOutputCount(memoryManager.getOutputRowCount()); + final int incomingRecordCount = incoming.getRecordCount(); + final int currentRecord = lateral.getRecordIndex(); + // We call this in setupSchema, but we also need to call it here so we have a reference to the appropriate vector + // inside of the the unnest for the current batch + setUnnestVector(); + + //Expected output count is the num of values in the unnest colum array for the current record + final int childCount = + incomingRecordCount == 0 ? 0 : unnest.getUnnestField().getAccessor().getInnerValueCountAt(currentRecord); + + // Unnest the data + final int outputRecords = childCount == 0 ? 0 : unnest.unnestRecords(childCount); + + logger.debug("{} values out of {} were processed.", outputRecords, childCount); + // Keep track of any spill over into another batch. Happens only if you artificially set the output batch + // size for unnest to a low number + if (outputRecords < childCount) { + hasRemainder = true; + remainderIndex = outputRecords; + this.recordCount = remainderIndex; + logger.debug("Output spilled into new batch. IterOutcome: OK."); + } else { + this.recordCount = outputRecords; + logger.debug("IterOutcome: EMIT."); + } + + memoryManager.updateOutgoingStats(outputRecords); + // If the current incoming record has spilled into two batches, we return + // IterOutcome.OK so that the Lateral Join can keep calling next() until the + // entire incoming recods has been unnested. If the entire records has been + // unnested, we return EMIT and any blocking operators in the pipeline will + // unblock. + return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT; + } + + private IterOutcome handleRemainder() { + Preconditions.checkNotNull(lateral); + memoryManager.update(); + unnest.setOutputCount(memoryManager.getOutputRowCount()); + final int currentRecord = lateral.getRecordIndex(); + final int remainingRecordCount = + unnest.getUnnestField().getAccessor().getInnerValueCountAt(currentRecord) - remainderIndex; + final int outputRecords = unnest.unnestRecords(remainingRecordCount); + logger.debug("{} values out of {} were processed.", outputRecords, remainingRecordCount); + if (outputRecords < remainingRecordCount) { + this.recordCount = outputRecords; + this.remainderIndex += outputRecords; + logger.debug("Output spilled into new batch. IterOutcome: OK."); + } else { + this.hasRemainder = false; + this.remainderIndex = 0; + this.recordCount = remainingRecordCount; + logger.debug("IterOutcome: EMIT."); + } + memoryManager.updateOutgoingStats(outputRecords); + return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT; + } + + /** + * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for + * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to + * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the + * scalar vector of the same type, we can avoid making individual copies for the column being unnested, and just + * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the unnest + * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or + * the end of one of the other vectors while we are copying the data of the other vectors alongside each new unnested + * value coming out of the repeated field.) + */ + @SuppressWarnings("resource") private TransferPair getUnnestFieldTransferPair(FieldReference reference) { + final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn()); + final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass(); + final ValueVector unnestField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector(); + + TransferPair tp = null; + if (unnestField instanceof RepeatedMapVector) { + tp = ((RepeatedMapVector) unnestField) + .getTransferPairToSingleMap(reference.getAsNamePart().getName(), oContext.getAllocator()); + } else if (!(unnestField instanceof RepeatedValueVector)) { + if (incoming.getRecordCount() != 0) { + throw UserException.unsupportedError().message("Unnest does not support inputs of non-list values.") + .build(logger); + } + logger.error("Cannot cast {} to RepeatedValueVector", unnestField); + //when incoming recordCount is 0, don't throw exception since the type being seen here is not solid + final ValueVector vv = new RepeatedMapVector(unnestField.getField(), oContext.getAllocator(), null); + tp = RepeatedValueVector.class.cast(vv) + .getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator()); + } else { + final ValueVector vvIn = RepeatedValueVector.class.cast(unnestField).getDataVector(); + // vvIn may be null because of fast schema return for repeated list vectors + if (vvIn != null) { + tp = vvIn.getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator()); + } + } + return tp; + } + + @Override protected boolean setupNewSchema() throws SchemaChangeException { + Preconditions.checkNotNull(lateral); + container.clear(); + recordCount = 0; + final List<TransferPair> transfers = Lists.newArrayList(); + + final FieldReference fieldReference = + new FieldReference(SchemaPath.getSimplePath(popConfig.getColumn().toString() + "_flat")); --- End diff -- Can you please add a TODO here since this will change once planner side of changes are available ?
---