[ 
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849332#comment-15849332
 ] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99024407
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
 ---
    @@ -0,0 +1,1321 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is 
written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the 
accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 32K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, we each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the 
input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are 
explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill 
directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The (comma? space?) separated list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass 
(limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the the allocator assigned to this 
batch, or</li>
    + * <li>The maximum limit set for this operator by the Foreman.</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. 
(Primarily for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two 
kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 
addresses
    + * the entire in-memory sort set. A selection vector remover will copy 
results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills 
to
    + * disk. In this case, the downstream operator will still be a selection 
vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the 
copying
    + * to eliminate the extra selection vector remover. That is left as an 
exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in 
unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit 
tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = 
Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = 
Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int outputBatchRecordCount;
    +  private int peakNumBatches = -1;
    +
    +  /**
    +   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
    +   * number of records to return in each batch.
    +   * <p>
    +   * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
    +   * <p>
    +   * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill 
contains a
    +   * design flaw that may give rise to fatal memory fragmentation if we 
allow it to
    +   * allocate larger vectors.
    +   */
    +
    +  private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so 
far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRecordSize;
    +
    +  /**
    +   * Estimated size of the spill and output batches that this
    +   * operator produces, estimated from the estimated record
    +   * size.
    +   */
    +
    +  private long estimatedOutputBatchSize;
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private int minSpillLimit;
    +  private int maxSpillLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer 
input
    +   * batches in memory until we are down to the level given by the spill 
point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query 
profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    INPUT_BATCHES;          // Number of batches read from upstream.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext 
context, RecordBatch incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use. It is either the
    +    // limit set on the allocator or on the operator, whichever is
    +    // less.
    +
    +    memoryLimit = Math.min(popConfig.getMaxAllocation(), 
allocator.getLimit());
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = 
config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least 
two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Limits on the minimum and maximum buffered batches to spill per
    +    // spill event.
    +
    +    minSpillLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE, 2);
    +    maxSpillLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE, 2);
    +    if (minSpillLimit > maxSpillLimit) {
    +      minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
    +      maxSpillLimit = minSpillLimit;
    +    }
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = 
config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 
10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, 
MAX_MERGED_BATCH_SIZE);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, 
MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "min, max spill limit: {}, {}, merge limit = {}, merge 
batch size = {}",
    +                  memoryLimit, bufferedBatchLimit, minSpillLimit, 
maxSpillLimit, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int 
valueIfZero, int minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining 
batches
    +        // Originally we would have thrown a RuntimeException that would 
propagate to FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the 
cleanup process continue instead of stopping
    +        // right away, this will also make sure we collect any additional 
exception we may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      if (bufferedBatches != null) {
    +        closeBatchGroups(bufferedBatches);
    +        bufferedBatches = null;
    +      }
    +      if (spilledRuns != null) {
    +        closeBatchGroups(spilledRuns);
    +        spilledRuns = null;
    +      }
    +    } finally {
    +      if (sv4 != null) {
    +        sv4.clear();
    +      }
    +      if (resultsIterator != null) {
    +        resultsIterator.close();
    +      }
    +      copierHolder.close();
    +      spillSet.close();
    +      opCodeGen.close();
    +
    +      // The call to super.close() clears out the output container.
    +      // Doing so requires the allocator here, so it must be closed
    +      // after the super call.
    +
    +      super.close();
    +      allocator.close();
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this 
hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE 
(TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        break;
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * the all incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + 
sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} 
records",
    +                    resultsIterator.getBatchCount(), 
resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, 
the
    +   * external sort accepts only one schema. It can handle compatible 
schemas
    +   * (which seems to mean the same columns in possibly different orders.)
    +   *
    +   * @return return code depending on the amount of data read from upstream
    +   */
    +
    +  private IterOutcome loadBatch() {
    +
    +    // If this is the very first batch, then AbstractRecordBatch
    +    // already loaded it for us in buildSchema().
    +
    +    IterOutcome upstream;
    +    if (sortState == SortState.START) {
    +      sortState = SortState.LOAD;
    +      upstream = IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      upstream = next(incoming);
    +    }
    +    switch (upstream) {
    +    case NONE:
    +      return upstream;
    +    case NOT_YET:
    +      throw new UnsupportedOperationException();
    +    case STOP:
    +      return upstream;
    +    case OK_NEW_SCHEMA:
    +    case OK:
    +      setupSchema(upstream);
    +
    +      // Add the batch to the in-memory generation, spilling if
    +      // needed.
    +
    +      processBatch();
    +      break;
    +    case OUT_OF_MEMORY:
    +
    +      // Note: it is highly doubtful that this code actually works. It
    +      // requires that the upstream batches got to a safe place to run
    +      // out of memory and that no work as in-flight and thus abandoned.
    +      // Consider removing this case once resource management is in place.
    +
    +      logger.debug("received OUT_OF_MEMORY, trying to spill");
    +      if (bufferedBatches.size() > 2) {
    +        spillFromMemory();
    +      } else {
    +        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
    +        return IterOutcome.OUT_OF_MEMORY;
    +      }
    +      break;
    +    default:
    +      throw new UnsupportedOperationException();
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Load the results and sort them. May bail out early if an exceptional
    +   * condition is passed up from the input batch.
    +   *
    +   * @return return code: OK_NEW_SCHEMA if rows were sorted,
    +   * NONE if no rows
    +   */
    +
    +  private IterOutcome load() {
    +    logger.trace("Start of load phase");
    +
    +    // Clear the temporary container created by
    +    // buildSchema().
    +
    +    container.clear();
    +
    +    // Loop over all input batches
    +
    +    for (;;) {
    +      IterOutcome result = loadBatch();
    +
    +      // None means all batches have been read.
    +
    +      if (result == IterOutcome.NONE) {
    +        break; }
    +
    +      // Any outcome other than OK means something went wrong.
    +
    +      if (result != IterOutcome.OK) {
    +        return result; }
    +    }
    +
    +    // Anything to actually sort?
    +
    +    if (inputRecordCount == 0) {
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +    logger.trace("Completed load phase: read {} batches", inputBatchCount);
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely 
in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    if (canUseMemoryMerge()) {
    +      return sortInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spillSet.hasSpilled()) { return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    long allocMem = allocator.getAllocatedMemory();
    +    long availableMem = memoryLimit - allocMem;
    +    long neededForInMemorySort = 
MSortTemplate.memoryNeeded(inputRecordCount);
    +    if (availableMem < neededForInMemorySort) { return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can 
address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Handle a new schema from upstream. The ESB is quite limited in its 
ability
    +   * to handle schema changes.
    +   *
    +   * @param upstream the status code from upstream: either OK or 
OK_NEW_SCHEMA
    +   */
    +
    +  private void setupSchema(IterOutcome upstream)  {
    +
    +    // First batch: we won't have a schema.
    +
    +    if (schema == null) {
    +      schema = incoming.getSchema();
    +
    +    // Subsequent batches, nothing to do if same schema.
    +
    +    } else if (upstream == IterOutcome.OK) {
    +      return;
    +
    +    // Only change in the case that the schema truly changes. Artificial 
schema changes are ignored.
    +
    +    } else if (incoming.getSchema().equals(schema)) {
    +      return;
    +    } else if (unionTypeEnabled) {
    +        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
    +
    +        // New schema: must generate a new sorter and copier.
    +
    +        opCodeGen.setSchema(schema);
    +    } else {
    +      throw UserException.unsupportedError()
    +            .message("Schema changes not supported in External Sort. 
Please enable Union type.")
    +            .build(logger);
    +    }
    +
    +    // Coerce all existing batches to the new schema.
    +
    +    for (BatchGroup b : bufferedBatches) {
    +//      System.out.println("Before: " + allocator.getAllocatedMemory()); 
// Debug only
    +      b.setSchema(schema);
    +//      System.out.println("After: " + allocator.getAllocatedMemory()); // 
Debug only
    +    }
    +    for (BatchGroup b : spilledRuns) {
    +      b.setSchema(schema);
    +    }
    +  }
    +
    +  /**
    +   * Convert an incoming batch into the agree-upon format. (Also seems to
    +   * make a persistent shallow copy of the batch saved until we are ready
    +   * to sort or spill.)
    +   *
    +   * @return the converted batch, or null if the incoming batch is empty
    +   */
    +
    +  private VectorContainer convertBatch() {
    +
    +    // Must accept the batch even if no records. Then clear
    +    // the vectors to release memory since we won't do any
    +    // further processing with the empty batch.
    +
    +    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, 
schema, oContext);
    +    if (incoming.getRecordCount() == 0) {
    +      for (VectorWrapper<?> w : convertedBatch) {
    +        w.clear();
    +      }
    +      return null;
    +    }
    +    return convertedBatch;
    +  }
    +
    +  private SelectionVector2 makeSelectionVector() {
    +    if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
    +      return incoming.getSelectionVector2().clone();
    +    } else {
    +      return newSV2();
    +    }
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory 
store
    +   * of data, or spilling data to disk when necessary.
    +   */
    +
    +  @SuppressWarnings("resource")
    +  private void processBatch() {
    +
    +    // The heart of the external sort operator: spill to disk when
    +    // the in-memory generation exceeds the allowed memory limit.
    +    // Preemptively spill BEFORE accepting the new batch into our memory
    +    // pool. The allocator will throw an OOM exception if we accept the
    +    // batch when we are near the limit - despite the fact that the batch
    +    // is already in memory and no new memory is allocated during the 
transfer.
    +
    +    if (isSpillNeeded()) {
    +      spillFromMemory();
    +    }
    +
    +    // Sanity check. We should now be above the spill point.
    +
    +    long startMem = allocator.getAllocatedMemory();
    +    if (memoryLimit - startMem < spillPoint) {
    +      logger.error( "ERROR: Failed to spill below the spill point. Spill 
point = {}, free memory = {}",
    +                    spillPoint, memoryLimit - startMem);
    +    }
    +
    +    // Convert the incoming batch to the agreed-upon schema.
    +    // No converted batch means we got an empty input batch.
    +    // Converting the batch transfers memory ownership to our
    +    // allocator. This gives a round-about way to learn the batch
    +    // size: check the before and after memory levels, then use
    +    // the difference as the batch size, in bytes.
    +
    +    VectorContainer convertedBatch = convertBatch();
    +    if (convertedBatch == null) {
    +      return;
    +    }
    +
    +    SelectionVector2 sv2 = makeSelectionVector();
    +
    +    // Compute batch size, including allocation of an sv2.
    +
    +    long endMem = allocator.getAllocatedMemory();
    +    long batchSize = endMem - startMem;
    +    int count = sv2.getCount();
    +    inputRecordCount += count;
    +    inputBatchCount++;
    +    stats.setLongStat(Metric.INPUT_BATCHES, inputBatchCount);
    +
    +    // Update the minimum buffer space metric.
    +
    +    if (minimumBufferSpace == 0) {
    +      minimumBufferSpace = endMem;
    +    } else {
    +      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
    +    }
    +    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
    +
    +    // Update the size based on the actual record count, not
    +    // the effective count as given by the selection vector
    +    // (which may exclude some records due to filtering.)
    +
    +    updateMemoryEstimates(batchSize, convertedBatch.getRecordCount());
    +
    +    // Sort the incoming batch using either the original selection vector,
    +    // or a new one created here.
    +
    +    SingleBatchSorter sorter;
    +    sorter = opCodeGen.getSorter(convertedBatch);
    +    try {
    +      sorter.setup(context, sv2, convertedBatch);
    +    } catch (SchemaChangeException e) {
    +      convertedBatch.clear();
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change.")
    +            .build(logger);
    +    }
    +    try {
    +      sorter.sort(sv2);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +                .message("Unexpected schema change.")
    +                .build(logger);
    +    }
    +    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
    +    try {
    +      rbd.setSv2(sv2);
    +      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), 
rbd.getSv2(), oContext, batchSize));
    +      if (peakNumBatches < bufferedBatches.size()) {
    +        peakNumBatches = bufferedBatches.size();
    +        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
    +      }
    +
    +    } catch (Throwable t) {
    +      rbd.clear();
    +      throw t;
    +    }
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   *
    +   * @param actualBatchSize the overall size of the current batch received 
from
    +   * upstream
    +   * @param actualRecordCount the number of actual (not filtered) records 
in
    +   * that upstream batch
    +   */
    +
    +  private void updateMemoryEstimates(long actualBatchSize, int 
actualRecordCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (actualRecordCount == 0) {
    +      return; }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    int batchRecordSize = (int) (actualBatchSize / actualRecordCount);
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origEstimate = estimatedRecordSize;
    +    estimatedRecordSize = Math.max(estimatedRecordSize, batchRecordSize);
    +
    +    // Go no further if nothing changed.
    +
    +    if (estimatedRecordSize == origEstimate) {
    +      return; }
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch.
    +
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, 
actualBatchSize);
    +
    +    // Estimate the input record count. Add one due to rounding.
    +
    +    long estimatedInputRecordCount = estimatedInputBatchSize / 
estimatedRecordSize + 1;
    +
    +    // Estimate the total size of each incoming batch plus sv2. Note that, 
due
    +    // to power-of-two rounding, the allocated size might be twice the 
data size.
    +
    +    long estimatedInputSize = estimatedInputBatchSize + 4 * 
estimatedInputRecordCount;
    +
    +    // Determine the number of records to spill per merge step. The goal 
is to
    +    // spill batches of either 32K records, or as many records as fit into 
the
    +    // amount of memory dedicated to each batch, whichever is less.
    +
    +    outputBatchRecordCount = (int) Math.max(1, preferredMergeBatchSize / 
estimatedRecordSize);
    +    outputBatchRecordCount = Math.min(outputBatchRecordCount, 
Short.MAX_VALUE);
    +
    +    // Compute the estimated size of batches that this operator creates.
    +    // Note that this estimate DOES NOT apply to incoming batches as we 
have
    +    // no control over those. Note that the output batch size should be 
about
    +    // the same as the MAX_MERGED_BATCH_SIZE constant used to create the
    +    // estimated record count. But, it can be bigger if we have jumbo sized
    +    // records larger than the target output size.
    +
    +    estimatedOutputBatchSize = outputBatchRecordCount * 
estimatedRecordSize;
    +    estimatedOutputBatchSize = Math.max(estimatedOutputBatchSize, 
preferredMergeBatchSize);
    +
    +    // Determine the minimum memory needed for spilling. Spilling is done 
just
    +    // before accepting a batch, so we must spill if we don't have room 
for a
    +    // (worst case) input batch. To spill, we need room for the output 
batch created
    +    // by merging the batches already in memory. Double this to allow for 
power-of-two
    +    // memory allocations, then add one more as a margin of safety.
    +
    +    spillPoint = estimatedInputBatchSize + 3 * estimatedOutputBatchSize;
    +
    +    // Determine the minimum total memory we would need to receive two 
input
    +    // batches (the minimum needed to make progress) and the allowance for 
the
    +    // output batch.
    +
    +    long minLoadMemory = spillPoint + estimatedInputSize;
    +
    +    // The merge memory pool assumes we can spill all input batches. To 
make
    +    // progress, we must have at least two merge batches (same size as an 
output
    +    // batch) and one output batch. Again, double to allow for power-of-two
    +    // allocation and add one for a margin of error.
    +
    +    long minMergeMemory = (2*3 + 1) * estimatedOutputBatchSize;
    +
    +    // Determine how much memory can be used to hold in-memory batches of 
spilled
    +    // runs when reading from disk.
    +
    +    mergeMemoryPool = Math.max(minMergeMemory,
    +                               (long) ((memoryLimit - 3 * 
estimatedOutputBatchSize) * 0.95));
    +
    +    // Sanity check: if we've been given too little memory to make 
progress,
    +    // issue a warning but proceed anyway. Should only occur if something 
is
    +    // configured terribly wrong.
    +
    +    long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
    +    if (minMemoryNeeds > memoryLimit) {
    +      logger.warn("updateMemoryEstimates: potential memory overflow! " +
    +                   "Minumum needed = {} bytes, actual available = {} 
bytes",
    +                   minMemoryNeeds, memoryLimit);
    +    }
    +
    +    // Log the calculated values. Turn this on if things seem amiss.
    +    // Message will appear only when the values change.
    +
    +    logger.debug("Memory Estimates: record size = {} bytes; input batch = 
{} bytes, {} records; " +
    +                  "output batch size = {} bytes, {} records; " +
    +                  "Available memory: {}, spill point = {}, min. merge 
memory = {}",
    +                estimatedRecordSize, estimatedInputBatchSize, 
estimatedInputRecordCount,
    +                estimatedOutputBatchSize, outputBatchRecordCount,
    +                memoryLimit, spillPoint, minMergeMemory);
    +  }
    +
    +  /**
    +   * Determine if spill is needed before receiving the new record batch.
    +   * Spilling is driven purely by memory availability (and an optional
    +   * batch limit for testing.)
    +   *
    +   * @return true if spilling is needed, false otherwise
    +   */
    +
    +  private boolean isSpillNeeded() {
    +
    +    // Can't spill if less than two batches else the merge
    +    // can't make progress.
    +
    +    if (bufferedBatches.size() < 2) {
    +      return false; }
    +
    +    // Must spill if we are below the spill point (the amount of memory
    +    // needed to do the minimal spill.)
    +
    +    long freeMemory = memoryLimit - allocator.getAllocatedMemory();
    +
    +    // Sanity check. If the memory goes negative, the calcs are off.
    +
    +    if (freeMemory < 0) {
    +      logger.error("ERROR: Free memory is negative: {}. Spill point = {}",
    +                   freeMemory, spillPoint);
    +    }
    +    if (freeMemory <= spillPoint) {
    +      return true; }
    +
    +    // Number of incoming batches (BatchGroups) exceed the limit and 
number of incoming
    +    // batches accumulated since the last spill exceed the defined limit
    +
    +    return bufferedBatches.size() > bufferedBatchLimit;
    +  }
    +
    +  /**
    +   * Perform an in-memory sort of the buffered batches. Obviously can
    +   * be used only for the non-spilling case.
    +   *
    +   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
    +   */
    +
    +  private IterOutcome sortInMemory() {
    +    logger.info("Starting in-memory sort. Batches = {}, Records = {}, 
Memory = {}",
    +                bufferedBatches.size(), inputRecordCount, 
allocator.getAllocatedMemory());
    +
    +    // Note the difference between how we handle batches here and in the 
spill/merge
    +    // case. In the spill/merge case, this class decides on the batch size 
to send
    +    // downstream. However, in the in-memory case, we must pass along all 
batches
    +    // in a single SV4. Attempts to do paging will result in errors. In 
the memory
    +    // merge case, the downstream Selection Vector Remover will split the 
one
    +    // big SV4 into multiple smaller batches to send further downstream.
    +
    +    // If the sort fails or is empty, clean up here. Otherwise, cleanup is 
done
    +    // by closing the resultsIterator after all results are returned 
downstream.
    +
    +    InMemorySorter memoryMerge = new InMemorySorter(context, allocator, 
opCodeGen);
    +    try {
    +      sv4 = memoryMerge.sort(bufferedBatches, this, container);
    +      if (sv4 == null) {
    +        sortState = SortState.DONE;
    +        return IterOutcome.STOP;
    +      } else {
    +        logger.info("Completed in-memory sort. Memory = {}",
    +                allocator.getAllocatedMemory());
    +        resultsIterator = memoryMerge;
    +        memoryMerge = null;
    +        sortState = SortState.DELIVER;
    +        return IterOutcome.OK_NEW_SCHEMA;
    +      }
    +    } finally {
    +      if (memoryMerge != null) {
    +        memoryMerge.close();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Perform merging of (typically spilled) batches. First consolidates 
batches
    +   * as needed, then performs a final merge that is read one batch at a 
time
    +   * to deliver batches to the downstream operator.
    +   *
    +   * @return always returns OK_NEW_SCHEMA
    +   */
    +
    +  private IterOutcome mergeSpilledRuns() {
    +    logger.info("Starting consolidate phase. Batches = {}, Records = {}, 
Memory = {}, In-memory batches {}, spilled runs {}",
    +                inputBatchCount, inputRecordCount, 
allocator.getAllocatedMemory(),
    +                bufferedBatches.size(), spilledRuns.size());
    +
    +    // Consolidate batches to a number that can be merged in
    +    // a single last pass.
    +
    +    int mergeCount = 0;
    +    while (consolidateBatches()) {
    +      mergeCount++;
    +    }
    +    stats.addLongStat(Metric.MERGE_COUNT, mergeCount);
    +
    +    // Merge in-memory batches and spilled runs for the final merge.
    +
    +    List<BatchGroup> allBatches = new LinkedList<>();
    +    allBatches.addAll(bufferedBatches);
    +    bufferedBatches.clear();
    +    allBatches.addAll(spilledRuns);
    +    spilledRuns.clear();
    +
    +    logger.info("Starting merge phase. Runs = {}, Alloc. memory = {}", 
allBatches.size(), allocator.getAllocatedMemory());
    +
    +    // Do the final merge as a results iterator.
    +
    +    CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, 
allBatches, container, outputBatchRecordCount);
    +    merger.next();
    +    resultsIterator = merger;
    +    sortState = SortState.DELIVER;
    +    return IterOutcome.OK_NEW_SCHEMA;
    +  }
    +
    +  private boolean consolidateBatches() {
    +
    +    // Determine additional memory needed to hold one batch from each
    +    // spilled run.
    +
    +    int inMemCount = bufferedBatches.size();
    +    int spilledRunsCount = spilledRuns.size();
    +
    +    // Can't merge more than will fit into memory at one time.
    +
    +    int maxMergeWidth = (int) (mergeMemoryPool / estimatedOutputBatchSize);
    +    maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
    +
    +    // If we can't fit all batches in memory, must spill any in-memory
    +    // batches to make room for multiple spill-merge-spill cycles.
    +
    +    if (inMemCount > 0) {
    +      if (spilledRunsCount > maxMergeWidth) {
    +        spillFromMemory();
    +        return true;
    +      }
    +
    +      // If we just plain have too many batches to merge, spill some
    +      // in-memory batches to reduce the burden.
    +
    +      if (inMemCount + spilledRunsCount > mergeLimit) {
    +        spillFromMemory();
    +        return true;
    +      }
    +
    +      // If the on-disk batches and in-memory batches need more memory than
    +      // is available, spill some in-memory batches.
    +
    +      long allocated = allocator.getAllocatedMemory();
    +      long totalNeeds = spilledRunsCount * estimatedOutputBatchSize + 
allocated;
    +      if (totalNeeds > mergeMemoryPool) {
    +        spillFromMemory();
    +        return true;
    +      }
    +    }
    +
    +    // Merge on-disk batches if we have too many.
    +
    +    int mergeCount = spilledRunsCount - maxMergeWidth;
    +    if (mergeCount <= 0) {
    +      return false;
    +    }
    +
    +    // We will merge. This will create yet another spilled
    +    // run. Account for that.
    +
    +    mergeCount += 1;
    +
    +    // Must merge at least 2 batches to make progress.
    +
    +    mergeCount = Math.max(2, mergeCount);
    +
    +    mergeCount = Math.min(mergeCount, maxMergeWidth);
    +
    +    // Do the merge, then loop to try again in case not
    +    // all the target batches spilled in one go.
    +
    +    logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
    +        mergeCount, allocator.getAllocatedMemory());
    +    mergeAndSpill(spilledRuns, mergeCount);
    +    return true;
    +  }
    +
    +  /**
    +   * This operator has accumulated a set of sorted incoming record batches.
    +   * We wish to spill some of them to disk. To do this, a "copier"
    +   * merges the target batches to produce a stream of new (merged) batches
    +   * which are then written to disk.
    +   * <p>
    +   * This method spills only half the accumulated batches
    +   * minimizing unnecessary disk writes. The exact count must lie between
    +   * the minimum and maximum spill counts.
    +    */
    +
    +  private void spillFromMemory() {
    +    int spillCount;
    +    if (spillFileSize == 0) {
    +      // If no spill limit given, guess half the batches.
    +
    +      spillCount = bufferedBatches.size() / 2;
    +    } else {
    +      // Spill file size given. Figure out how many
    +      // batches.
    +
    +      long estSize = 0;
    +      spillCount = 0;
    +      for (InputBatch batch : bufferedBatches) {
    +        estSize += batch.getDataSize();
    +        if (estSize > spillFileSize) {
    +          break; }
    +        spillCount++;
    +      }
    +    }
    +
    +    // Upper spill bound (optional)
    +
    +    spillCount = Math.min(spillCount, maxSpillLimit);
    +
    +    // Lower spill bound (at least 2, perhaps more)
    +
    +    spillCount = Math.max(spillCount, minSpillLimit);
    +
    +    // Should not happen, but just to be sure...
    +
    +    if (spillCount == 0) {
    +      return; }
    +
    +    // Do the actual spill.
    +
    +    logger.trace("Starting spill from memory. Memory = {}, Batch count = 
{}, Spill count = {}",
    +                 allocator.getAllocatedMemory(), bufferedBatches.size(), 
spillCount);
    +    mergeAndSpill(bufferedBatches, spillCount);
    +  }
    +
    +  private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int 
count) {
    +    if (count == 0) {
    +      return; }
    +    spilledRuns.add(doMergeAndSpill(source, count));
    +  }
    +
    +  private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends 
BatchGroup> batchGroups, int spillCount) {
    +    List<BatchGroup> batchesToSpill = Lists.newArrayList();
    +    spillCount = Math.min(batchGroups.size(), spillCount);
    +    assert spillCount > 0 : "Spill count to mergeAndSpill must not be 
zero";
    +    long spillSize = 0;
    +    for (int i = 0; i < spillCount; i++) {
    +      @SuppressWarnings("resource")
    +      BatchGroup batch = batchGroups.pollFirst();
    +      assert batch != null : "Encountered a null batch during merge and 
spill operation";
    +      batchesToSpill.add(batch);
    +      spillSize += batch.getDataSize();
    +    }
    +
    +    // Merge the selected set of matches and write them to the
    +    // spill file. After each write, we release the memory associated
    +    // with the just-written batch.
    +
    +    String outputFile = spillSet.getNextSpillFile();
    +    stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
    +    BatchGroup.SpilledRun newGroup = null;
    +    try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
    +         CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, 
batchesToSpill, outputBatchRecordCount)) {
    +      logger.trace("Merging and spilling to {}", outputFile);
    +      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext, 
spillSize);
    +
    +      // The copier will merge records from the buffered batches into
    +      // the outputContainer up to targetRecordCount number of rows.
    +      // The actual count may be less if fewer records are available.
    +
    +      while (merger.next()) {
    +
    +        // Add a new batch of records (given by merger.getOutput()) to the 
spill
    +        // file, opening the file if not yet open, and creating the target
    +        // directory if it does not yet exist.
    +        //
    +        // note that addBatch also clears the merger's output container
    +
    +        newGroup.addBatch(merger.getOutput());
    +      }
    +      injector.injectChecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_SPILLING, IOException.class);
    +      newGroup.closeOutputStream();
    +      logger.trace("mergeAndSpill: completed, memory = {}, spilled {} 
records to {}",
    +                   allocator.getAllocatedMemory(), 
merger.getRecordCount(), outputFile);
    +      return newGroup;
    +    } catch (Throwable e) {
    +      // we only need to cleanup newGroup if spill failed
    +      try {
    +        if (newGroup != null) {
    +          AutoCloseables.close(e, newGroup);
    +        }
    +      } catch (Throwable t) { /* close() may hit the same IO issue; just 
ignore */ }
    +
    +      // Here the merger is holding onto a partially-completed batch.
    +      // It will release the memory in the close() call.
    +
    +      try {
    +        // Rethrow so we can organize how to handle the error.
    +
    +        throw e;
    +      }
    +
    +      // If error is a User Exception, just use as is.
    +
    +      catch (UserException ue) { throw ue; }
    +      catch (Throwable ex) {
    +        throw UserException.resourceError(ex)
    +              .message("External Sort encountered an error while spilling 
to disk")
    +              .build(logger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Allocate and initialize the selection vector used as the sort index.
    +   * Assumes that memory is available for the vector since memory 
management
    +   * ensured space is available.
    +   *
    +   * @return a new, populated selection vector 2
    +   */
    +
    +  private SelectionVector2 newSV2() {
    +    SelectionVector2 sv2 = new SelectionVector2(allocator);
    +    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
    +      throw UserException.resourceError(new OutOfMemoryException("Unable 
to allocate sv2 buffer"))
    +            .build(logger);
    +    }
    +    for (int i = 0; i < incoming.getRecordCount(); i++) {
    --- End diff --
    
    Is this initialization/population of the SV2 really needed ?  Probably the 
sort() later fills up the right values there.
    p.s. thanks for cleaning - the original/non-managed version of newSV2() is 
a mess. 


> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10.0
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that 
> works to a clearly-defined memory limit. Attached is a design specification 
> for the work.
> The project will include fixing a number of bugs related to the external 
> sort, include as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to