Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r100703575
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
 ---
    @@ -0,0 +1,1456 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
    +import org.apache.drill.exec.physical.impl.spill.SpillSet;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is 
written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * Data is spilled to disk as a "run". A run consists of one or more 
(typically
    + * many) batches, each of which is itself a sorted run of records.
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the 
accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 64K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. Further, memory must hold one batch
    + * from each run, so we may need to reduce the number of runs so that the
    + * remaining runs can fit into memory. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the 
input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are 
explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill 
directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The comma delimited list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass 
(limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the allocator assigned to this batch
    + * as set by the Foreman, or</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. 
(Primarily for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two 
kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 
addresses
    + * the entire in-memory sort set. A selection vector remover will copy 
results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills 
to
    + * disk. In this case, the downstream operator will still be a selection 
vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the 
copying
    + * to eliminate the extra selection vector remover. That is left as an 
exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in 
unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit 
tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  /**
    +   * In the bizarre case where the user gave us an unrealistically low
    +   * spill file size, set a floor at some bare minimum size. (Note that,
    +   * at this size, big queries will create a huge number of files, which
    +   * is why the configuration default is one the order of hundreds of MB.)
    +   */
    +
    +  private static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +  public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024;
    +  public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024;
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = 
Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = 
Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int mergeBatchRowCount;
    +  private int peakNumBatches = -1;
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so 
far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRowWidth;
    +
    +  /**
    +   * Size of the merge batches that this operator produces. Generally
    +   * the same as the merge batch size, unless low memory forces a smaller
    +   * value.
    +   */
    +
    +  private long targetMergeBatchSize;
    +
    +  /**
    +   * Estimate of the input batch size based on the largest batch seen
    +   * thus far.
    +   */
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer 
input
    +   * batches in memory until we are down to the level given by the spill 
point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +  private long bufferMemoryPool;
    +  private boolean hasOversizeCols;
    +  private long totalInputBytes;
    +  private Long spillBatchSize;
    +  private int maxDensity;
    +
    +  /**
    +   * Estimated number of rows that fit into a single spill batch.
    +   */
    +
    +  private int spillBatchRowCount;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query 
profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    SPILL_MB;               // Number of MB of data spilled to disk. This
    +                            // amount is first written, then later re-read.
    +                            // So, disk I/O is twice this amount.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext 
context, RecordBatch incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use as set by the
    +    // operator definition (propagated to the allocator.)
    +
    +    memoryLimit = allocator.getLimit();
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = 
config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least 
two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = 
config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Ensure the size is reasonable.
    +
    +    spillFileSize = Math.max(spillFileSize, MIN_SPILL_FILE_SIZE);
    +
    +    // The spill batch size. This is a critical setting for performance.
    +    // Set too large and the ratio between memory and input data sizes 
becomes
    +    // small. Set too small and disk seek times dominate performance.
    +
    +    spillBatchSize = 
config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
    +    spillBatchSize = Math.max(spillBatchSize, MIN_SPILL_BATCH_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 
10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    preferredMergeBatchSize = 
config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE);
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, 
preferredMergeBatchSize);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, 
MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "spill file size = {}, batch size = {}, merge limit = {}, 
merge batch size = {}",
    +                  memoryLimit, bufferedBatchLimit, spillFileSize, 
spillBatchSize, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int 
valueIfZero, int minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining 
batches
    +        // Originally we would have thrown a RuntimeException that would 
propagate to FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the 
cleanup process continue instead of stopping
    +        // right away, this will also make sure we collect any additional 
exception we may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this 
hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE 
(TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        throw new IllegalStateException("Unexpected iter outcome: " + 
outcome);
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * all the incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + 
sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} 
records",
    +                    resultsIterator.getBatchCount(), 
resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, 
the
    +   * external sort accepts only one schema.
    +   *
    +   * @return return code depending on the amount of data read from upstream
    +   */
    +
    +  private IterOutcome loadBatch() {
    +
    +    // If this is the very first batch, then AbstractRecordBatch
    +    // already loaded it for us in buildSchema().
    +
    +    IterOutcome upstream;
    +    if (sortState == SortState.START) {
    +      sortState = SortState.LOAD;
    +      upstream = IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      upstream = next(incoming);
    +    }
    +    switch (upstream) {
    +    case NONE:
    +    case STOP:
    +      return upstream;
    +    case OK_NEW_SCHEMA:
    +    case OK:
    +      setupSchema(upstream);
    +
    +      // Add the batch to the in-memory generation, spilling if
    +      // needed.
    +
    +      processBatch();
    +      break;
    +    case OUT_OF_MEMORY:
    +
    +      // Note: it is highly doubtful that this code actually works. It
    +      // requires that the upstream batches got to a safe place to run
    +      // out of memory and that no work as in-flight and thus abandoned.
    +      // Consider removing this case once resource management is in place.
    +
    +      logger.debug("received OUT_OF_MEMORY, trying to spill");
    +      if (bufferedBatches.size() > 2) {
    +        spillFromMemory();
    +      } else {
    +        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
    +        return IterOutcome.OUT_OF_MEMORY;
    +      }
    +      break;
    +    default:
    +      throw new IllegalStateException("Unexpected iter outcome: " + 
upstream);
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Load the results and sort them. May bail out early if an exceptional
    +   * condition is passed up from the input batch.
    +   *
    +   * @return return code: OK_NEW_SCHEMA if rows were sorted,
    +   * NONE if no rows
    +   */
    +
    +  private IterOutcome load() {
    +    logger.trace("Start of load phase");
    +
    +    // Clear the temporary container created by
    +    // buildSchema().
    +
    +    container.clear();
    +
    +    // Loop over all input batches
    +
    +    for (;;) {
    +      IterOutcome result = loadBatch();
    +
    +      // None means all batches have been read.
    +
    +      if (result == IterOutcome.NONE) {
    +        break; }
    +
    +      // Any outcome other than OK means something went wrong.
    +
    +      if (result != IterOutcome.OK) {
    +        return result; }
    +    }
    +
    +    // Anything to actually sort?
    +
    +    if (inputRecordCount == 0) {
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +    logger.debug("Completed load phase: read {} batches, spilled {} times, 
total input bytes: {}",
    +                 inputBatchCount, spilledRuns.size(), totalInputBytes);
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely 
in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    if (canUseMemoryMerge()) {
    +      return sortInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spillSet.hasSpilled()) { return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    long allocMem = allocator.getAllocatedMemory();
    +    long availableMem = memoryLimit - allocMem;
    +    long neededForInMemorySort = 
MSortTemplate.memoryNeeded(inputRecordCount);
    +    if (availableMem < neededForInMemorySort) { return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can 
address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Handle a new schema from upstream. The ESB is quite limited in its 
ability
    +   * to handle schema changes.
    +   *
    +   * @param upstream the status code from upstream: either OK or 
OK_NEW_SCHEMA
    +   */
    +
    +  private void setupSchema(IterOutcome upstream)  {
    +
    +    // First batch: we won't have a schema.
    +
    +    if (schema == null) {
    +      schema = incoming.getSchema();
    +
    +    // Subsequent batches, nothing to do if same schema.
    +
    +    } else if (upstream == IterOutcome.OK) {
    +      return;
    +
    +    // Only change in the case that the schema truly changes. Artificial 
schema changes are ignored.
    +
    +    } else if (incoming.getSchema().equals(schema)) {
    +      return;
    +    } else if (unionTypeEnabled) {
    +        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
    +
    +        // New schema: must generate a new sorter and copier.
    +
    +        opCodeGen.setSchema(schema);
    +    } else {
    +      throw UserException.unsupportedError()
    +            .message("Schema changes not supported in External Sort. 
Please enable Union type.")
    +            .build(logger);
    +    }
    +
    +    // Coerce all existing batches to the new schema.
    +
    +    for (BatchGroup b : bufferedBatches) {
    +//      System.out.println("Before: " + allocator.getAllocatedMemory()); 
// Debug only
    +      b.setSchema(schema);
    +//      System.out.println("After: " + allocator.getAllocatedMemory()); // 
Debug only
    +    }
    +    for (BatchGroup b : spilledRuns) {
    +      b.setSchema(schema);
    +    }
    +  }
    +
    +  /**
    +   * Convert an incoming batch into the agree-upon format. (Also seems to
    +   * make a persistent shallow copy of the batch saved until we are ready
    +   * to sort or spill.)
    +   *
    +   * @return the converted batch, or null if the incoming batch is empty
    +   */
    +
    +  private VectorContainer convertBatch() {
    +
    +    // Must accept the batch even if no records. Then clear
    +    // the vectors to release memory since we won't do any
    +    // further processing with the empty batch.
    +
    +    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, 
schema, oContext);
    +    if (incoming.getRecordCount() == 0) {
    +      for (VectorWrapper<?> w : convertedBatch) {
    +        w.clear();
    +      }
    +      return null;
    +    }
    +    return convertedBatch;
    +  }
    +
    +  private SelectionVector2 makeSelectionVector() {
    +    if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
    +      return incoming.getSelectionVector2().clone();
    +    } else {
    +      return newSV2();
    +    }
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory 
store
    +   * of data, or spilling data to disk when necessary.
    +   */
    +
    +  @SuppressWarnings("resource")
    +  private void processBatch() {
    +
    +    // Skip empty batches (such as the first one.)
    +
    +    if (incoming.getRecordCount() == 0) {
    +      return;
    +    }
    +
    +    // Determine actual sizes of the incoming batch before taking
    +    // ownership. Allows us to figure out if we need to spill first,
    +    // to avoid overflowing memory simply due to ownership transfer.
    +
    +    RecordBatchSizer sizer = analyzeIncomingBatch();
    --- End diff --
    
    Is calling analyzeIncomingBatch() absolutely needed if the schema did not 
change from one batch to next ?  I suppose you could argue that a VARCHAR 
column average width could change substantially from one batch to next..is that 
the scenario we want to handle here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to