[ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862994#comment-15862994 ]
ASF GitHub Bot commented on DRILL-5080: --------------------------------------- Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r100703575 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java --- @@ -0,0 +1,1456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.physical.config.ExternalSort; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.SchemaUtil; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; + +import com.google.common.collect.Lists; + +/** + * External sort batch: a sort batch which can spill to disk in + * order to operate within a defined memory footprint. + * <p> + * <h4>Basic Operation</h4> + * The operator has three key phases: + * <p> + * <ul> + * <li>The load phase in which batches are read from upstream.</li> + * <li>The merge phase in which spilled batches are combined to + * reduce the number of files below the configured limit. (Best + * practice is to configure the system to avoid this phase.) + * <li>The delivery phase in which batches are combined to produce + * the final output.</li> + * </ul> + * During the load phase: + * <p> + * <ul> + * <li>The incoming (upstream) operator provides a series of batches.</li> + * <li>This operator sorts each batch, and accumulates them in an in-memory + * buffer.</li> + * <li>If the in-memory buffer becomes too large, this operator selects + * a subset of the buffered batches to spill.</li> + * <li>Each spill set is merged to create a new, sorted collection of + * batches, and each is spilled to disk.</li> + * <li>To allow the use of multiple disk storage, each spill group is written + * round-robin to a set of spill directories.</li> + * </ul> + * <p> + * Data is spilled to disk as a "run". A run consists of one or more (typically + * many) batches, each of which is itself a sorted run of records. + * <p> + * During the sort/merge phase: + * <p> + * <ul> + * <li>When the input operator is complete, this operator merges the accumulated + * batches (which may be all in memory or partially on disk), and returns + * them to the output (downstream) operator in chunks of no more than + * 64K records.</li> + * <li>The final merge must combine a collection of in-memory and spilled + * batches. Several limits apply to the maximum "width" of this merge. For + * example, each open spill run consumes a file handle, and we may wish + * to limit the number of file handles. Further, memory must hold one batch + * from each run, so we may need to reduce the number of runs so that the + * remaining runs can fit into memory. A consolidation phase combines + * in-memory and spilled batches prior to the final merge to control final + * merge width.</li> + * <li>A special case occurs if no batches were spilled. In this case, the input + * batches are sorted in memory without merging.</li> + * </ul> + * <p> + * Many complex details are involved in doing the above; the details are explained + * in the methods of this class. + * <p> + * <h4>Configuration Options</h4> + * <dl> + * <dt>drill.exec.sort.external.spill.fs</dt> + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd> + * <dt>drill.exec.sort.external.spill.directories</dt> + * <dd>The comma delimited list of directories, on the above file + * system, to which to spill files in round-robin fashion. The query will + * fail if any one of the directories becomes full.</dt> + * <dt>drill.exec.sort.external.spill.file_size</dt> + * <dd>Target size for first-generation spill files Set this to large + * enough to get nice long writes, but not so large that spill directories + * are overwhelmed.</dd> + * <dt>drill.exec.sort.external.mem_limit</dt> + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.batch_limit</dt> + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.max_count</dt> + * <dd>Maximum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.min_count</dt> + * <dd>Minimum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.merge_limit</dt> + * <dd>Sets the maximum number of runs to be merged in a single pass (limits + * the number of open files.)</dd> + * </dl> + * <p> + * The memory limit observed by this operator is the lesser of: + * <ul> + * <li>The maximum allocation allowed the allocator assigned to this batch + * as set by the Foreman, or</li> + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily for + * testing.</li> + * </ul> + * <h4>Output</h4> + * It is helpful to note that the sort operator will produce one of two kinds of + * output batches. + * <ul> + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses + * the entire in-memory sort set. A selection vector remover will copy results + * into new batches of a size determined by that operator.</li> + * <li>A series of batches, without a selection vector, if the sort spills to + * disk. In this case, the downstream operator will still be a selection vector + * remover, but there is nothing for that operator to remove. Each batch is + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li> + * </ul> + * Note that, even in the in-memory sort case, this operator could do the copying + * to eliminate the extra selection vector remover. That is left as an exercise + * for another time. + * <h4>Logging</h4> + * Logging in this operator serves two purposes: + * <li> + * <ul> + * <li>Normal diagnostic information.</li> + * <li>Capturing the essence of the operator functionality for analysis in unit + * tests.</li> + * </ul> + * Test logging is designed to capture key events and timings. Take care + * when changing or removing log messages as you may need to adjust unit tests + * accordingly. + */ + +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class); + protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class); + + /** + * Smallest allowed output batch size. The smallest output batch + * created even under constrained memory conditions. + */ + private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024; + + /** + * The preferred amount of memory to set aside to output batches + * expressed as a ratio of available memory. + */ + + private static final float MERGE_BATCH_ALLOWANCE = 0.10F; + + /** + * In the bizarre case where the user gave us an unrealistically low + * spill file size, set a floor at some bare minimum size. (Note that, + * at this size, big queries will create a huge number of files, which + * is why the configuration default is one the order of hundreds of MB.) + */ + + private static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024; + + public static final String INTERRUPTION_AFTER_SORT = "after-sort"; + public static final String INTERRUPTION_AFTER_SETUP = "after-setup"; + public static final String INTERRUPTION_WHILE_SPILLING = "spilling"; + public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024; + public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024; + + private final RecordBatch incoming; + + /** + * Memory allocator for this operator itself. Incoming batches are + * transferred into this allocator. Intermediate batches used during + * merge also reside here. + */ + + private final BufferAllocator allocator; + + /** + * Schema of batches that this operator produces. + */ + + private BatchSchema schema; + + private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList(); + private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList(); + private SelectionVector4 sv4; + + /** + * The number of records to add to each output batch sent to the + * downstream operator or spilled to disk. + */ + + private int mergeBatchRowCount; + private int peakNumBatches = -1; + + private long memoryLimit; + + /** + * Iterates over the final, sorted results. + */ + + private SortResults resultsIterator; + + /** + * Manages the set of spill directories and files. + */ + + private final SpillSet spillSet; + + /** + * Manages the copier used to merge a collection of batches into + * a new set of batches. + */ + + private final CopierHolder copierHolder; + + private enum SortState { START, LOAD, DELIVER, DONE } + private SortState sortState = SortState.START; + private int inputRecordCount = 0; + private int inputBatchCount = 0; // total number of batches received so far + private final OperatorCodeGenerator opCodeGen; + + /** + * Estimated size of the records for this query, updated on each + * new batch received from upstream. + */ + + private int estimatedRowWidth; + + /** + * Size of the merge batches that this operator produces. Generally + * the same as the merge batch size, unless low memory forces a smaller + * value. + */ + + private long targetMergeBatchSize; + + /** + * Estimate of the input batch size based on the largest batch seen + * thus far. + */ + private long estimatedInputBatchSize; + + /** + * Maximum number of batches to hold in memory. + * (Primarily for testing.) + */ + + private int bufferedBatchLimit; + private int mergeLimit; + private long spillFileSize; + private long minimumBufferSpace; + + /** + * Minimum memory level before spilling occurs. That is, we can buffer input + * batches in memory until we are down to the level given by the spill point. + */ + + private long spillPoint; + private long mergeMemoryPool; + private long preferredMergeBatchSize; + private long bufferMemoryPool; + private boolean hasOversizeCols; + private long totalInputBytes; + private Long spillBatchSize; + private int maxDensity; + + /** + * Estimated number of rows that fit into a single spill batch. + */ + + private int spillBatchRowCount; + + // WARNING: The enum here is used within this class. But, the members of + // this enum MUST match those in the (unmanaged) ExternalSortBatch since + // that is the enum used in the UI to display metrics for the query profile. + + public enum Metric implements MetricDef { + SPILL_COUNT, // number of times operator spilled to disk + RETIRED1, // Was: peak value for totalSizeInMemory + // But operator already provides this value + PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory + MERGE_COUNT, // Number of second+ generation merges + MIN_BUFFER, // Minimum memory level observed in operation. + SPILL_MB; // Number of MB of data spilled to disk. This + // amount is first written, then later re-read. + // So, disk I/O is twice this amount. + + @Override + public int metricId() { + return ordinal(); + } + } + + /** + * Iterates over the final sorted results. Implemented differently + * depending on whether the results are in-memory or spilled to + * disk. + */ + + public interface SortResults { + boolean next(); + void close(); + int getBatchCount(); + int getRecordCount(); + } + + public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) { + super(popConfig, context, true); + this.incoming = incoming; + allocator = oContext.getAllocator(); + opCodeGen = new OperatorCodeGenerator(context, popConfig); + + spillSet = new SpillSet(context, popConfig); + copierHolder = new CopierHolder(context, allocator, opCodeGen); + configure(context.getConfig()); + } + + private void configure(DrillConfig config) { + + // The maximum memory this operator can use as set by the + // operator definition (propagated to the allocator.) + + memoryLimit = allocator.getLimit(); + + // Optional configured memory limit, typically used only for testing. + + long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY); + if (configLimit > 0) { + memoryLimit = Math.min(memoryLimit, configLimit); + } + + // Optional limit on the number of buffered in-memory batches. + // 0 means no limit. Used primarily for testing. Must allow at least two + // batches or no merging can occur. + + bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2); + + // Optional limit on the number of spilled runs to merge in a single + // pass. Limits the number of open file handles. Must allow at least + // two batches to merge to make progress. + + mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2); + + // Limits the size of first-generation spill files. + + spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE); + + // Ensure the size is reasonable. + + spillFileSize = Math.max(spillFileSize, MIN_SPILL_FILE_SIZE); + + // The spill batch size. This is a critical setting for performance. + // Set too large and the ratio between memory and input data sizes becomes + // small. Set too small and disk seek times dominate performance. + + spillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE); + spillBatchSize = Math.max(spillBatchSize, MIN_SPILL_BATCH_SIZE); + + // Set the target output batch size. Use the maximum size, but only if + // this represents less than 10% of available memory. Otherwise, use 10% + // of memory, but no smaller than the minimum size. In any event, an + // output batch can contain no fewer than a single record. + + preferredMergeBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE); + long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE); + preferredMergeBatchSize = Math.min(maxAllowance, preferredMergeBatchSize); + preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE); + + logger.debug("Config: memory limit = {}, batch limit = {}, " + + "spill file size = {}, batch size = {}, merge limit = {}, merge batch size = {}", + memoryLimit, bufferedBatchLimit, spillFileSize, spillBatchSize, mergeLimit, + preferredMergeBatchSize); + } + + private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) { + int limit = config.getInt(paramName); + if (limit > 0) { + limit = Math.max(limit, minValue); + } else { + limit = valueIfZero; + } + return limit; + } + + @Override + public int getRecordCount() { + if (sv4 != null) { + return sv4.getCount(); + } + return container.getRecordCount(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + return sv4; + } + + private void closeBatchGroups(Collection<? extends BatchGroup> groups) { + for (BatchGroup group: groups) { + try { + group.close(); + } catch (Exception e) { + // collect all failure and make sure to cleanup all remaining batches + // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources() + // where it would have been passed to context.fail() + // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping + // right away, this will also make sure we collect any additional exception we may get while cleaning up + context.fail(e); + } + } + } + + /** + * Called by {@link AbstractRecordBatch} as a fast-path to obtain + * the first record batch and setup the schema of this batch in order + * to quickly return the schema to the client. Note that this method + * fetches the first batch from upstream which will be waiting for + * us the first time that {@link #innerNext()} is called. + */ + + @Override + public void buildSchema() { + IterOutcome outcome = next(incoming); + switch (outcome) { + case OK: + case OK_NEW_SCHEMA: + for (VectorWrapper<?> w : incoming) { + @SuppressWarnings("resource") + ValueVector v = container.addOrGet(w.getField()); + if (v instanceof AbstractContainerVector) { + w.getValueVector().makeTransferPair(v); // Can we remove this hack? + v.clear(); + } + v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO) + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + break; + case STOP: + state = BatchState.STOP; + break; + case OUT_OF_MEMORY: + state = BatchState.OUT_OF_MEMORY; + break; + case NONE: + state = BatchState.DONE; + break; + default: + throw new IllegalStateException("Unexpected iter outcome: " + outcome); + } + } + + /** + * Process each request for a batch. The first request retrieves + * all the incoming batches and sorts them, optionally spilling to + * disk as needed. Subsequent calls retrieve the sorted results in + * fixed-size batches. + */ + + @Override + public IterOutcome innerNext() { + switch (sortState) { + case DONE: + return IterOutcome.NONE; + case START: + case LOAD: + return load(); + case DELIVER: + return nextOutputBatch(); + default: + throw new IllegalStateException("Unexpected sort state: " + sortState); + } + } + + private IterOutcome nextOutputBatch() { + if (resultsIterator.next()) { + return IterOutcome.OK; + } else { + logger.trace("Deliver phase complete: Returned {} batches, {} records", + resultsIterator.getBatchCount(), resultsIterator.getRecordCount()); + sortState = SortState.DONE; + return IterOutcome.NONE; + } + } + + /** + * Load and process a single batch, handling schema changes. In general, the + * external sort accepts only one schema. + * + * @return return code depending on the amount of data read from upstream + */ + + private IterOutcome loadBatch() { + + // If this is the very first batch, then AbstractRecordBatch + // already loaded it for us in buildSchema(). + + IterOutcome upstream; + if (sortState == SortState.START) { + sortState = SortState.LOAD; + upstream = IterOutcome.OK_NEW_SCHEMA; + } else { + upstream = next(incoming); + } + switch (upstream) { + case NONE: + case STOP: + return upstream; + case OK_NEW_SCHEMA: + case OK: + setupSchema(upstream); + + // Add the batch to the in-memory generation, spilling if + // needed. + + processBatch(); + break; + case OUT_OF_MEMORY: + + // Note: it is highly doubtful that this code actually works. It + // requires that the upstream batches got to a safe place to run + // out of memory and that no work as in-flight and thus abandoned. + // Consider removing this case once resource management is in place. + + logger.debug("received OUT_OF_MEMORY, trying to spill"); + if (bufferedBatches.size() > 2) { + spillFromMemory(); + } else { + logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream"); + return IterOutcome.OUT_OF_MEMORY; + } + break; + default: + throw new IllegalStateException("Unexpected iter outcome: " + upstream); + } + return IterOutcome.OK; + } + + /** + * Load the results and sort them. May bail out early if an exceptional + * condition is passed up from the input batch. + * + * @return return code: OK_NEW_SCHEMA if rows were sorted, + * NONE if no rows + */ + + private IterOutcome load() { + logger.trace("Start of load phase"); + + // Clear the temporary container created by + // buildSchema(). + + container.clear(); + + // Loop over all input batches + + for (;;) { + IterOutcome result = loadBatch(); + + // None means all batches have been read. + + if (result == IterOutcome.NONE) { + break; } + + // Any outcome other than OK means something went wrong. + + if (result != IterOutcome.OK) { + return result; } + } + + // Anything to actually sort? + + if (inputRecordCount == 0) { + sortState = SortState.DONE; + return IterOutcome.NONE; + } + logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}", + inputBatchCount, spilledRuns.size(), totalInputBytes); + + // Do the merge of the loaded batches. The merge can be done entirely in memory if + // the results fit; else we have to do a disk-based merge of + // pre-sorted spilled batches. + + if (canUseMemoryMerge()) { + return sortInMemory(); + } else { + return mergeSpilledRuns(); + } + } + + /** + * All data has been read from the upstream batch. Determine if we + * can use a fast in-memory sort, or must use a merge (which typically, + * but not always, involves spilled batches.) + * + * @return whether sufficient resources exist to do an in-memory sort + * if all batches are still in memory + */ + + private boolean canUseMemoryMerge() { + if (spillSet.hasSpilled()) { return false; } + + // Do we have enough memory for MSorter (the in-memory sorter)? + + long allocMem = allocator.getAllocatedMemory(); + long availableMem = memoryLimit - allocMem; + long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount); + if (availableMem < neededForInMemorySort) { return false; } + + // Make sure we don't exceed the maximum number of batches SV4 can address. + + if (bufferedBatches.size() > Character.MAX_VALUE) { return false; } + + // We can do an in-memory merge. + + return true; + } + + /** + * Handle a new schema from upstream. The ESB is quite limited in its ability + * to handle schema changes. + * + * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA + */ + + private void setupSchema(IterOutcome upstream) { + + // First batch: we won't have a schema. + + if (schema == null) { + schema = incoming.getSchema(); + + // Subsequent batches, nothing to do if same schema. + + } else if (upstream == IterOutcome.OK) { + return; + + // Only change in the case that the schema truly changes. Artificial schema changes are ignored. + + } else if (incoming.getSchema().equals(schema)) { + return; + } else if (unionTypeEnabled) { + schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema()); + + // New schema: must generate a new sorter and copier. + + opCodeGen.setSchema(schema); + } else { + throw UserException.unsupportedError() + .message("Schema changes not supported in External Sort. Please enable Union type.") + .build(logger); + } + + // Coerce all existing batches to the new schema. + + for (BatchGroup b : bufferedBatches) { +// System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only + b.setSchema(schema); +// System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only + } + for (BatchGroup b : spilledRuns) { + b.setSchema(schema); + } + } + + /** + * Convert an incoming batch into the agree-upon format. (Also seems to + * make a persistent shallow copy of the batch saved until we are ready + * to sort or spill.) + * + * @return the converted batch, or null if the incoming batch is empty + */ + + private VectorContainer convertBatch() { + + // Must accept the batch even if no records. Then clear + // the vectors to release memory since we won't do any + // further processing with the empty batch. + + VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); + if (incoming.getRecordCount() == 0) { + for (VectorWrapper<?> w : convertedBatch) { + w.clear(); + } + return null; + } + return convertedBatch; + } + + private SelectionVector2 makeSelectionVector() { + if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { + return incoming.getSelectionVector2().clone(); + } else { + return newSV2(); + } + } + + /** + * Process the converted incoming batch by adding it to the in-memory store + * of data, or spilling data to disk when necessary. + */ + + @SuppressWarnings("resource") + private void processBatch() { + + // Skip empty batches (such as the first one.) + + if (incoming.getRecordCount() == 0) { + return; + } + + // Determine actual sizes of the incoming batch before taking + // ownership. Allows us to figure out if we need to spill first, + // to avoid overflowing memory simply due to ownership transfer. + + RecordBatchSizer sizer = analyzeIncomingBatch(); --- End diff -- Is calling analyzeIncomingBatch() absolutely needed if the schema did not change from one batch to next ? I suppose you could argue that a VARCHAR column average width could change substantially from one batch to next..is that the scenario we want to handle here ? > Create a memory-managed version of the External Sort operator > ------------------------------------------------------------- > > Key: DRILL-5080 > URL: https://issues.apache.org/jira/browse/DRILL-5080 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.8.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Labels: ready-to-commit > Fix For: 1.10.0 > > Attachments: ManagedExternalSortDesign.pdf > > > We propose to create a "managed" version of the external sort operator that > works to a clearly-defined memory limit. Attached is a design specification > for the work. > The project will include fixing a number of bugs related to the external > sort, include as sub-tasks of this umbrella task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)