Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1223#discussion_r183110851
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 ---
    @@ -0,0 +1,451 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.unnest;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.FieldReference;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.data.NamedExpression;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.base.LateralContract;
    +import org.apache.drill.exec.physical.config.UnnestPOP;
    +import org.apache.drill.exec.record.AbstractSingleRecordBatch;
    +import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.CloseableRecordBatch;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.RecordBatchMemoryManager;
    +import org.apache.drill.exec.record.RecordBatchSizer;
    +import org.apache.drill.exec.record.TransferPair;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.RepeatedMapVector;
    +import org.apache.drill.exec.vector.complex.RepeatedValueVector;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
    +import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    +
    +// TODO - handle the case where a user tries to unnest a scalar, should 
just return the column as is
    +public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPOP> {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
    +
    +  private Unnest unnest;
    +  private boolean hasRemainder = false; // set to true if there is data 
left over for the current row AND if we want
    +                                        // to keep processing it. Kill may 
be called by a limit in a subquery that
    +                                        // requires us to stop processing 
thecurrent row, but not stop processing
    +                                        // the data.
    +  // In some cases we need to return a predetermined state from a call to 
next. These are:
    +  // 1) Kill is called due to an error occurring in the processing of the 
query. IterOutcome should be NONE
    +  // 2) Kill is called by LIMIT to stop processing of the current row 
(This occurs when the LIMIT is part of a subquery
    +  //    between UNNEST and LATERAL. Iteroutcome should be EMIT
    +  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome 
should be NONE
    +  private IterOutcome nextState = OK;
    +  private int remainderIndex = 0;
    +  private int recordCount;
    +  private MaterializedField unnestFieldMetadata;
    +  private final UnnestMemoryManager memoryManager;
    +
    +  public enum Metric implements MetricDef {
    +    INPUT_BATCH_COUNT,
    +    AVG_INPUT_BATCH_BYTES,
    +    AVG_INPUT_ROW_BYTES,
    +    INPUT_RECORD_COUNT,
    +    OUTPUT_BATCH_COUNT,
    +    AVG_OUTPUT_BATCH_BYTES,
    +    AVG_OUTPUT_ROW_BYTES,
    +    OUTPUT_RECORD_COUNT;
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Memory manager for Unnest. Estimates the batch size exactly like we 
do for Flatten.
    +   */
    +  private class UnnestMemoryManager extends RecordBatchMemoryManager {
    +
    +    private UnnestMemoryManager(int outputBatchSize) {
    +      super(outputBatchSize);
    +    }
    +
    +    @Override
    +    public void update() {
    +      // Get sizing information for the batch.
    +      setRecordBatchSizer(new RecordBatchSizer(incoming));
    +
    +      final TypedFieldId typedFieldId = 
incoming.getValueVectorId(popConfig.getColumn());
    +      final MaterializedField field = 
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
    +
    +      // Get column size of unnest column.
    +      RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer
    +          .getColumn(incoming.getValueAccessorById(field.getValueClass(), 
typedFieldId.getFieldIds()).getValueVector(),
    +              field.getName());
    +
    +      // Average rowWidth of single element in the unnest list.
    +      // subtract the offset vector size from column data size.
    +      final int avgRowWidthSingleUnnestEntry = RecordBatchSizer
    +          .safeDivide(columnSize.getTotalNetSize() - 
(getOffsetVectorWidth() * columnSize.getValueCount()), columnSize
    +              .getElementCount());
    +
    +      // Average rowWidth of outgoing batch.
    +      final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry;
    +
    +      // Number of rows in outgoing batch
    +      final int outputBatchSize = getOutputBatchSize();
    +      // Number of rows in outgoing batch
    +      setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
    +
    +      setOutgoingRowWidth(avgOutgoingRowWidth);
    +
    +      // Limit to lower bound of total number of rows possible for this 
batch
    +      // i.e. all rows fit within memory budget.
    +      setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
    +
    +      logger.debug("incoming batch size : {}", getRecordBatchSizer());
    +
    +      logger.debug("output batch size : {}, avg outgoing rowWidth : {}, 
output rowCount : {}",
    +          outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
    +
    +      updateIncomingStats();
    +    }
    +
    +  }
    +
    +
    +  public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws 
OutOfMemoryException {
    +    super(pop, context);
    +    // get the output batch size from config.
    +    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    +    memoryManager = new UnnestMemoryManager(configuredBatchSize);
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    return recordCount;
    +  }
    +
    +  protected void killIncoming(boolean sendUpstream) {
    +    // Kill may be received from an operator downstream of the 
corresponding lateral, or from
    +    // a limit that is in a subqueruy between unnest and lateral. In the 
latter case, unnest has to handle the limit.
    +    // In the former case, Lateral will handle most of the kill handling.
    +
    +    Preconditions.checkNotNull(lateral);
    +    // Do not call kill on incoming. Lateral Join has the responsibility 
for killing incoming
    +    if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() 
== IterOutcome.STOP) {
    +      logger.debug("Kill received. Stopping all processing");
    +      nextState = IterOutcome.NONE ;
    +    } else {
    +      // if we have already processed the record, then kill from a limit 
has no meaning.
    +      // if, however, we have values remaining to be emitted, and limit 
has been reached,
    +      // we abandon the remainder and send an empty batch with EMIT.
    +      logger.debug("Kill received from subquery. Stopping processing of 
current input row.");
    +      if(hasRemainder) {
    +        nextState = IterOutcome.EMIT;
    +      }
    +    }
    +    hasRemainder = false; // whatever the case, we need to stop processing 
the current row.
    +  }
    +
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +
    +    Preconditions.checkNotNull(lateral);
    +
    +    // Short circuit if record batch has already sent all data and is done
    +    if (state == BatchState.DONE) {
    +      return IterOutcome.NONE;
    +    }
    +
    +    if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
    +      return nextState;
    +    }
    +
    +    if (hasRemainder) {
    +      return handleRemainder();
    +    }
    +
    +    // We do not need to call next() unlike the other operators.
    +    // When unnest's innerNext is called, the LateralJoin would have 
already
    +    // updated the incoming vector.
    +    // We do, however, need to call doWork() to do the actual work.
    +    // We also need to handle schema build if it is the first batch
    +
    +    if ((state == BatchState.FIRST)) {
    +      state = BatchState.NOT_FIRST;
    +      try {
    +        stats.startSetup();
    +        hasRemainder = true; // next call to next will handle the actual 
data.
    +        logger.debug("First batch received");
    +        schemaChanged(); // checks if schema has changed (redundant in 
this case becaause it has) AND saves the
    +                         // current field metadata for check in subsequent 
iterations
    +        setupNewSchema();
    +      } catch (SchemaChangeException ex) {
    +        kill(false);
    +        logger.error("Failure during query", ex);
    +        context.getExecutorState().fail(ex);
    +        return IterOutcome.STOP;
    +      } finally {
    +        stats.stopSetup();
    +      }
    +      return IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      assert state != BatchState.FIRST : "First batch should be 
OK_NEW_SCHEMA";
    +      container.zeroVectors();
    +
    +      // Check if schema has changed
    +      if (lateral.getRecordIndex() == 0 && schemaChanged()) {
    +        hasRemainder = true;     // next call to next will handle the 
actual data.
    +        try {
    +          setupNewSchema();
    +        } catch (SchemaChangeException ex) {
    +          kill(false);
    +          logger.error("Failure during query", ex);
    +          context.getExecutorState().fail(ex);
    +          return IterOutcome.STOP;
    +        }
    +        return OK_NEW_SCHEMA;
    +      }
    +      if (lateral.getRecordIndex() == 0) {
    +        unnest.resetGroupIndex();
    +      }
    +      return doWork();
    +    }
    +
    +  }
    +
    +    @Override
    +  public VectorContainer getOutgoingContainer() {
    +    return this.container;
    +  }
    +
    +  @SuppressWarnings("resource") private void setUnnestVector() {
    --- End diff --
    
    Please refactor `setUnnestVector` and `getUnnestFieldTransferPair` into a 
single method.


---

Reply via email to