[ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850667#comment-15850667 ]
ASF GitHub Bot commented on DRILL-5080: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r99238315 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java --- @@ -0,0 +1,1321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.physical.config.ExternalSort; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.SchemaUtil; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; + +import com.google.common.collect.Lists; + +/** + * External sort batch: a sort batch which can spill to disk in + * order to operate within a defined memory footprint. + * <p> + * <h4>Basic Operation</h4> + * The operator has three key phases: + * <p> + * <ul> + * <li>The load phase in which batches are read from upstream.</li> + * <li>The merge phase in which spilled batches are combined to + * reduce the number of files below the configured limit. (Best + * practice is to configure the system to avoid this phase.) + * <li>The delivery phase in which batches are combined to produce + * the final output.</li> + * </ul> + * During the load phase: + * <p> + * <ul> + * <li>The incoming (upstream) operator provides a series of batches.</li> + * <li>This operator sorts each batch, and accumulates them in an in-memory + * buffer.</li> + * <li>If the in-memory buffer becomes too large, this operator selects + * a subset of the buffered batches to spill.</li> + * <li>Each spill set is merged to create a new, sorted collection of + * batches, and each is spilled to disk.</li> + * <li>To allow the use of multiple disk storage, each spill group is written + * round-robin to a set of spill directories.</li> + * </ul> + * <p> + * During the sort/merge phase: + * <p> + * <ul> + * <li>When the input operator is complete, this operator merges the accumulated + * batches (which may be all in memory or partially on disk), and returns + * them to the output (downstream) operator in chunks of no more than + * 32K records.</li> + * <li>The final merge must combine a collection of in-memory and spilled + * batches. Several limits apply to the maximum "width" of this merge. For + * example, we each open spill run consumes a file handle, and we may wish + * to limit the number of file handles. A consolidation phase combines + * in-memory and spilled batches prior to the final merge to control final + * merge width.</li> + * <li>A special case occurs if no batches were spilled. In this case, the input + * batches are sorted in memory without merging.</li> + * </ul> + * <p> + * Many complex details are involved in doing the above; the details are explained + * in the methods of this class. + * <p> + * <h4>Configuration Options</h4> + * <dl> + * <dt>drill.exec.sort.external.spill.fs</dt> + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd> + * <dt>drill.exec.sort.external.spill.directories</dt> + * <dd>The (comma? space?) separated list of directories, on the above file + * system, to which to spill files in round-robin fashion. The query will + * fail if any one of the directories becomes full.</dt> + * <dt>drill.exec.sort.external.spill.file_size</dt> + * <dd>Target size for first-generation spill files Set this to large + * enough to get nice long writes, but not so large that spill directories + * are overwhelmed.</dd> + * <dt>drill.exec.sort.external.mem_limit</dt> + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.batch_limit</dt> + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.max_count</dt> + * <dd>Maximum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.min_count</dt> + * <dd>Minimum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.merge_limit</dt> + * <dd>Sets the maximum number of runs to be merged in a single pass (limits + * the number of open files.)</dd> + * </dl> + * <p> + * The memory limit observed by this operator is the lesser of: + * <ul> + * <li>The maximum allocation allowed the the allocator assigned to this batch, or</li> + * <li>The maximum limit set for this operator by the Foreman.</li> + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily for + * testing.</li> + * </ul> + * <h4>Output</h4> + * It is helpful to note that the sort operator will produce one of two kinds of + * output batches. + * <ul> + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses + * the entire in-memory sort set. A selection vector remover will copy results + * into new batches of a size determined by that operator.</li> + * <li>A series of batches, without a selection vector, if the sort spills to + * disk. In this case, the downstream operator will still be a selection vector + * remover, but there is nothing for that operator to remove. Each batch is + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li> + * </ul> + * Note that, even in the in-memory sort case, this operator could do the copying + * to eliminate the extra selection vector remover. That is left as an exercise + * for another time. + * <h4>Logging</h4> + * Logging in this operator serves two purposes: + * <li> + * <ul> + * <li>Normal diagnostic information.</li> + * <li>Capturing the essence of the operator functionality for analysis in unit + * tests.</li> + * </ul> + * Test logging is designed to capture key events and timings. Take care + * when changing or removing log messages as you may need to adjust unit tests + * accordingly. + */ + +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class); + protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class); + + private final RecordBatch incoming; + + /** + * Memory allocator for this operator itself. Incoming batches are + * transferred into this allocator. Intermediate batches used during + * merge also reside here. + */ + + private final BufferAllocator allocator; + + /** + * Schema of batches that this operator produces. + */ + + private BatchSchema schema; + + private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList(); + private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList(); + private SelectionVector4 sv4; + + /** + * The number of records to add to each output batch sent to the + * downstream operator or spilled to disk. + */ + + private int outputBatchRecordCount; + private int peakNumBatches = -1; + + /** + * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target + * number of records to return in each batch. + * <p> + * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}. + * <p> + * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill contains a + * design flaw that may give rise to fatal memory fragmentation if we allow it to + * allocate larger vectors. + */ + + private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024; + + /** + * Smallest allowed output batch size. The smallest output batch + * created even under constrained memory conditions. + */ + private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024; + + /** + * The preferred amount of memory to set aside to output batches + * expressed as a ratio of available memory. + */ + + private static final float MERGE_BATCH_ALLOWANCE = 0.10F; + + public static final String INTERRUPTION_AFTER_SORT = "after-sort"; + public static final String INTERRUPTION_AFTER_SETUP = "after-setup"; + public static final String INTERRUPTION_WHILE_SPILLING = "spilling"; + + private long memoryLimit; + + /** + * Iterates over the final, sorted results. + */ + + private SortResults resultsIterator; + + /** + * Manages the set of spill directories and files. + */ + + private final SpillSet spillSet; + + /** + * Manages the copier used to merge a collection of batches into + * a new set of batches. + */ + + private final CopierHolder copierHolder; + + private enum SortState { START, LOAD, DELIVER, DONE } + private SortState sortState = SortState.START; + private int inputRecordCount = 0; + private int inputBatchCount = 0; // total number of batches received so far + private final OperatorCodeGenerator opCodeGen; + + /** + * Estimated size of the records for this query, updated on each + * new batch received from upstream. + */ + + private int estimatedRecordSize; + + /** + * Estimated size of the spill and output batches that this + * operator produces, estimated from the estimated record + * size. + */ + + private long estimatedOutputBatchSize; + private long estimatedInputBatchSize; + + /** + * Maximum number of batches to hold in memory. + * (Primarily for testing.) + */ + + private int bufferedBatchLimit; + private int mergeLimit; + private int minSpillLimit; + private int maxSpillLimit; + private long spillFileSize; + private long minimumBufferSpace; + + /** + * Minimum memory level before spilling occurs. That is, we can buffer input + * batches in memory until we are down to the level given by the spill point. + */ + + private long spillPoint; + private long mergeMemoryPool; + private long preferredMergeBatchSize; + + // WARNING: The enum here is used within this class. But, the members of + // this enum MUST match those in the (unmanaged) ExternalSortBatch since + // that is the enum used in the UI to display metrics for the query profile. + + public enum Metric implements MetricDef { + SPILL_COUNT, // number of times operator spilled to disk + RETIRED1, // Was: peak value for totalSizeInMemory + // But operator already provides this value + PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory + MERGE_COUNT, // Number of second+ generation merges + MIN_BUFFER, // Minimum memory level observed in operation. + INPUT_BATCHES; // Number of batches read from upstream. + + @Override + public int metricId() { + return ordinal(); + } + } + + /** + * Iterates over the final sorted results. Implemented differently + * depending on whether the results are in-memory or spilled to + * disk. + */ + + public interface SortResults { + boolean next(); + void close(); + int getBatchCount(); + int getRecordCount(); + } + + public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) { + super(popConfig, context, true); + this.incoming = incoming; + allocator = oContext.getAllocator(); + opCodeGen = new OperatorCodeGenerator(context, popConfig); + + spillSet = new SpillSet(context, popConfig); + copierHolder = new CopierHolder(context, allocator, opCodeGen); + configure(context.getConfig()); + } + + private void configure(DrillConfig config) { + + // The maximum memory this operator can use. It is either the + // limit set on the allocator or on the operator, whichever is + // less. + + memoryLimit = Math.min(popConfig.getMaxAllocation(), allocator.getLimit()); + + // Optional configured memory limit, typically used only for testing. + + long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY); + if (configLimit > 0) { + memoryLimit = Math.min(memoryLimit, configLimit); + } + + // Optional limit on the number of buffered in-memory batches. + // 0 means no limit. Used primarily for testing. Must allow at least two + // batches or no merging can occur. + + bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2); + + // Optional limit on the number of spilled runs to merge in a single + // pass. Limits the number of open file handles. Must allow at least + // two batches to merge to make progress. + + mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2); + + // Limits on the minimum and maximum buffered batches to spill per + // spill event. + + minSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE, 2); + maxSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE, 2); + if (minSpillLimit > maxSpillLimit) { + minSpillLimit = Math.min(minSpillLimit, maxSpillLimit); + maxSpillLimit = minSpillLimit; + } + + // Limits the size of first-generation spill files. + + spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE); + + // Set the target output batch size. Use the maximum size, but only if + // this represents less than 10% of available memory. Otherwise, use 10% + // of memory, but no smaller than the minimum size. In any event, an + // output batch can contain no fewer than a single record. + + long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE); + preferredMergeBatchSize = Math.min(maxAllowance, MAX_MERGED_BATCH_SIZE); + preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE); + + logger.debug("Config: memory limit = {}, batch limit = {}, " + + "min, max spill limit: {}, {}, merge limit = {}, merge batch size = {}", + memoryLimit, bufferedBatchLimit, minSpillLimit, maxSpillLimit, mergeLimit, + preferredMergeBatchSize); + } + + private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) { + int limit = config.getInt(paramName); + if (limit > 0) { + limit = Math.max(limit, minValue); + } else { + limit = valueIfZero; + } + return limit; + } + + @Override + public int getRecordCount() { + if (sv4 != null) { + return sv4.getCount(); + } + return container.getRecordCount(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + return sv4; + } + + private void closeBatchGroups(Collection<? extends BatchGroup> groups) { + for (BatchGroup group: groups) { + try { + group.close(); + } catch (Exception e) { + // collect all failure and make sure to cleanup all remaining batches + // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources() + // where it would have been passed to context.fail() + // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping + // right away, this will also make sure we collect any additional exception we may get while cleaning up + context.fail(e); + } + } + } + + @Override + public void close() { + try { + if (bufferedBatches != null) { + closeBatchGroups(bufferedBatches); + bufferedBatches = null; + } + if (spilledRuns != null) { + closeBatchGroups(spilledRuns); + spilledRuns = null; + } + } finally { + if (sv4 != null) { + sv4.clear(); + } + if (resultsIterator != null) { + resultsIterator.close(); + } + copierHolder.close(); + spillSet.close(); + opCodeGen.close(); + + // The call to super.close() clears out the output container. + // Doing so requires the allocator here, so it must be closed + // after the super call. + + super.close(); + allocator.close(); + } + } + + /** + * Called by {@link AbstractRecordBatch} as a fast-path to obtain + * the first record batch and setup the schema of this batch in order + * to quickly return the schema to the client. Note that this method + * fetches the first batch from upstream which will be waiting for + * us the first time that {@link #innerNext()} is called. + */ + + @Override + public void buildSchema() { + IterOutcome outcome = next(incoming); + switch (outcome) { + case OK: + case OK_NEW_SCHEMA: + for (VectorWrapper<?> w : incoming) { + @SuppressWarnings("resource") + ValueVector v = container.addOrGet(w.getField()); + if (v instanceof AbstractContainerVector) { + w.getValueVector().makeTransferPair(v); // Can we remove this hack? + v.clear(); + } + v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO) + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + break; + case STOP: + state = BatchState.STOP; + break; + case OUT_OF_MEMORY: + state = BatchState.OUT_OF_MEMORY; + break; + case NONE: + state = BatchState.DONE; + break; + default: + break; + } + } + + /** + * Process each request for a batch. The first request retrieves + * the all incoming batches and sorts them, optionally spilling to + * disk as needed. Subsequent calls retrieve the sorted results in + * fixed-size batches. + */ + + @Override + public IterOutcome innerNext() { + switch (sortState) { + case DONE: + return IterOutcome.NONE; + case START: + case LOAD: + return load(); + case DELIVER: + return nextOutputBatch(); + default: + throw new IllegalStateException("Unexpected sort state: " + sortState); + } + } + + private IterOutcome nextOutputBatch() { + if (resultsIterator.next()) { + return IterOutcome.OK; + } else { + logger.trace("Deliver phase complete: Returned {} batches, {} records", + resultsIterator.getBatchCount(), resultsIterator.getRecordCount()); + sortState = SortState.DONE; + return IterOutcome.NONE; + } + } + + /** + * Load and process a single batch, handling schema changes. In general, the + * external sort accepts only one schema. It can handle compatible schemas + * (which seems to mean the same columns in possibly different orders.) + * + * @return return code depending on the amount of data read from upstream + */ + + private IterOutcome loadBatch() { + + // If this is the very first batch, then AbstractRecordBatch + // already loaded it for us in buildSchema(). + + IterOutcome upstream; + if (sortState == SortState.START) { + sortState = SortState.LOAD; + upstream = IterOutcome.OK_NEW_SCHEMA; + } else { + upstream = next(incoming); + } + switch (upstream) { + case NONE: + return upstream; + case NOT_YET: + throw new UnsupportedOperationException(); + case STOP: + return upstream; + case OK_NEW_SCHEMA: + case OK: + setupSchema(upstream); + + // Add the batch to the in-memory generation, spilling if + // needed. + + processBatch(); + break; + case OUT_OF_MEMORY: + + // Note: it is highly doubtful that this code actually works. It + // requires that the upstream batches got to a safe place to run + // out of memory and that no work as in-flight and thus abandoned. + // Consider removing this case once resource management is in place. + + logger.debug("received OUT_OF_MEMORY, trying to spill"); + if (bufferedBatches.size() > 2) { + spillFromMemory(); + } else { + logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream"); + return IterOutcome.OUT_OF_MEMORY; + } + break; + default: + throw new UnsupportedOperationException(); + } + return IterOutcome.OK; + } + + /** + * Load the results and sort them. May bail out early if an exceptional + * condition is passed up from the input batch. + * + * @return return code: OK_NEW_SCHEMA if rows were sorted, + * NONE if no rows + */ + + private IterOutcome load() { + logger.trace("Start of load phase"); + + // Clear the temporary container created by + // buildSchema(). + + container.clear(); + + // Loop over all input batches + + for (;;) { + IterOutcome result = loadBatch(); + + // None means all batches have been read. + + if (result == IterOutcome.NONE) { + break; } + + // Any outcome other than OK means something went wrong. + + if (result != IterOutcome.OK) { + return result; } + } + + // Anything to actually sort? + + if (inputRecordCount == 0) { + sortState = SortState.DONE; + return IterOutcome.NONE; + } + logger.trace("Completed load phase: read {} batches", inputBatchCount); + + // Do the merge of the loaded batches. The merge can be done entirely in memory if + // the results fit; else we have to do a disk-based merge of + // pre-sorted spilled batches. + + if (canUseMemoryMerge()) { + return sortInMemory(); + } else { + return mergeSpilledRuns(); + } + } + + /** + * All data has been read from the upstream batch. Determine if we + * can use a fast in-memory sort, or must use a merge (which typically, + * but not always, involves spilled batches.) + * + * @return whether sufficient resources exist to do an in-memory sort + * if all batches are still in memory + */ + + private boolean canUseMemoryMerge() { + if (spillSet.hasSpilled()) { return false; } + + // Do we have enough memory for MSorter (the in-memory sorter)? + + long allocMem = allocator.getAllocatedMemory(); + long availableMem = memoryLimit - allocMem; + long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount); + if (availableMem < neededForInMemorySort) { return false; } + + // Make sure we don't exceed the maximum number of batches SV4 can address. + + if (bufferedBatches.size() > Character.MAX_VALUE) { return false; } + + // We can do an in-memory merge. + + return true; + } + + /** + * Handle a new schema from upstream. The ESB is quite limited in its ability + * to handle schema changes. + * + * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA + */ + + private void setupSchema(IterOutcome upstream) { + + // First batch: we won't have a schema. + + if (schema == null) { + schema = incoming.getSchema(); + + // Subsequent batches, nothing to do if same schema. + + } else if (upstream == IterOutcome.OK) { + return; + + // Only change in the case that the schema truly changes. Artificial schema changes are ignored. + + } else if (incoming.getSchema().equals(schema)) { + return; + } else if (unionTypeEnabled) { + schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema()); + + // New schema: must generate a new sorter and copier. + + opCodeGen.setSchema(schema); + } else { + throw UserException.unsupportedError() + .message("Schema changes not supported in External Sort. Please enable Union type.") + .build(logger); + } + + // Coerce all existing batches to the new schema. + + for (BatchGroup b : bufferedBatches) { +// System.out.println("Before: " + allocator.getAllocatedMemory()); // Debug only + b.setSchema(schema); +// System.out.println("After: " + allocator.getAllocatedMemory()); // Debug only + } + for (BatchGroup b : spilledRuns) { + b.setSchema(schema); + } + } + + /** + * Convert an incoming batch into the agree-upon format. (Also seems to + * make a persistent shallow copy of the batch saved until we are ready + * to sort or spill.) + * + * @return the converted batch, or null if the incoming batch is empty + */ + + private VectorContainer convertBatch() { + + // Must accept the batch even if no records. Then clear + // the vectors to release memory since we won't do any + // further processing with the empty batch. + + VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); + if (incoming.getRecordCount() == 0) { + for (VectorWrapper<?> w : convertedBatch) { + w.clear(); + } + return null; + } + return convertedBatch; + } + + private SelectionVector2 makeSelectionVector() { + if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { + return incoming.getSelectionVector2().clone(); + } else { + return newSV2(); + } + } + + /** + * Process the converted incoming batch by adding it to the in-memory store + * of data, or spilling data to disk when necessary. + */ + + @SuppressWarnings("resource") + private void processBatch() { + + // The heart of the external sort operator: spill to disk when + // the in-memory generation exceeds the allowed memory limit. + // Preemptively spill BEFORE accepting the new batch into our memory + // pool. The allocator will throw an OOM exception if we accept the + // batch when we are near the limit - despite the fact that the batch + // is already in memory and no new memory is allocated during the transfer. + + if (isSpillNeeded()) { + spillFromMemory(); + } + + // Sanity check. We should now be above the spill point. + + long startMem = allocator.getAllocatedMemory(); + if (memoryLimit - startMem < spillPoint) { + logger.error( "ERROR: Failed to spill below the spill point. Spill point = {}, free memory = {}", + spillPoint, memoryLimit - startMem); + } + + // Convert the incoming batch to the agreed-upon schema. + // No converted batch means we got an empty input batch. + // Converting the batch transfers memory ownership to our + // allocator. This gives a round-about way to learn the batch + // size: check the before and after memory levels, then use + // the difference as the batch size, in bytes. + + VectorContainer convertedBatch = convertBatch(); + if (convertedBatch == null) { + return; + } + + SelectionVector2 sv2 = makeSelectionVector(); + + // Compute batch size, including allocation of an sv2. + + long endMem = allocator.getAllocatedMemory(); + long batchSize = endMem - startMem; + int count = sv2.getCount(); + inputRecordCount += count; + inputBatchCount++; + stats.setLongStat(Metric.INPUT_BATCHES, inputBatchCount); + + // Update the minimum buffer space metric. + + if (minimumBufferSpace == 0) { + minimumBufferSpace = endMem; + } else { + minimumBufferSpace = Math.min(minimumBufferSpace, endMem); + } + stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace); + + // Update the size based on the actual record count, not + // the effective count as given by the selection vector + // (which may exclude some records due to filtering.) + + updateMemoryEstimates(batchSize, convertedBatch.getRecordCount()); + + // Sort the incoming batch using either the original selection vector, + // or a new one created here. + + SingleBatchSorter sorter; + sorter = opCodeGen.getSorter(convertedBatch); + try { + sorter.setup(context, sv2, convertedBatch); + } catch (SchemaChangeException e) { + convertedBatch.clear(); + throw UserException.unsupportedError(e) + .message("Unexpected schema change.") + .build(logger); + } + try { + sorter.sort(sv2); + } catch (SchemaChangeException e) { + throw UserException.unsupportedError(e) + .message("Unexpected schema change.") + .build(logger); + } + RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator); + try { + rbd.setSv2(sv2); + bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, batchSize)); + if (peakNumBatches < bufferedBatches.size()) { + peakNumBatches = bufferedBatches.size(); + stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches); + } + + } catch (Throwable t) { + rbd.clear(); + throw t; + } + } + + /** + * Update the data-driven memory use numbers including: + * <ul> + * <li>The average size of incoming records.</li> + * <li>The estimated spill and output batch size.</li> + * <li>The estimated number of average-size records per + * spill and output batch.</li> + * <li>The amount of memory set aside to hold the incoming + * batches before spilling starts.</li> + * </ul> + * + * @param actualBatchSize the overall size of the current batch received from + * upstream + * @param actualRecordCount the number of actual (not filtered) records in + * that upstream batch + */ + + private void updateMemoryEstimates(long actualBatchSize, int actualRecordCount) { + + // The record count should never be zero, but better safe than sorry... + + if (actualRecordCount == 0) { + return; } + + // We know the batch size and number of records. Use that to estimate + // the average record size. Since a typical batch has many records, + // the average size is a fairly good estimator. Note that the batch + // size includes not just the actual vector data, but any unused space + // resulting from power-of-two allocation. This means that we don't + // have to do size adjustments for input batches as we will do below + // when estimating the size of other objects. + + int batchRecordSize = (int) (actualBatchSize / actualRecordCount); + + // Record sizes may vary across batches. To be conservative, use + // the largest size observed from incoming batches. + + int origEstimate = estimatedRecordSize; + estimatedRecordSize = Math.max(estimatedRecordSize, batchRecordSize); + + // Go no further if nothing changed. + + if (estimatedRecordSize == origEstimate) { + return; } + + // Maintain an estimate of the incoming batch size: the largest + // batch yet seen. Used to reserve memory for the next incoming + // batch. + + estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize); + + // Estimate the input record count. Add one due to rounding. + + long estimatedInputRecordCount = estimatedInputBatchSize / estimatedRecordSize + 1; + + // Estimate the total size of each incoming batch plus sv2. Note that, due + // to power-of-two rounding, the allocated size might be twice the data size. + + long estimatedInputSize = estimatedInputBatchSize + 4 * estimatedInputRecordCount; + + // Determine the number of records to spill per merge step. The goal is to + // spill batches of either 32K records, or as many records as fit into the + // amount of memory dedicated to each batch, whichever is less. + + outputBatchRecordCount = (int) Math.max(1, preferredMergeBatchSize / estimatedRecordSize); + outputBatchRecordCount = Math.min(outputBatchRecordCount, Short.MAX_VALUE); + + // Compute the estimated size of batches that this operator creates. + // Note that this estimate DOES NOT apply to incoming batches as we have + // no control over those. Note that the output batch size should be about + // the same as the MAX_MERGED_BATCH_SIZE constant used to create the + // estimated record count. But, it can be bigger if we have jumbo sized + // records larger than the target output size. + + estimatedOutputBatchSize = outputBatchRecordCount * estimatedRecordSize; + estimatedOutputBatchSize = Math.max(estimatedOutputBatchSize, preferredMergeBatchSize); + + // Determine the minimum memory needed for spilling. Spilling is done just + // before accepting a batch, so we must spill if we don't have room for a + // (worst case) input batch. To spill, we need room for the output batch created + // by merging the batches already in memory. Double this to allow for power-of-two + // memory allocations, then add one more as a margin of safety. + + spillPoint = estimatedInputBatchSize + 3 * estimatedOutputBatchSize; + + // Determine the minimum total memory we would need to receive two input + // batches (the minimum needed to make progress) and the allowance for the + // output batch. + + long minLoadMemory = spillPoint + estimatedInputSize; + + // The merge memory pool assumes we can spill all input batches. To make + // progress, we must have at least two merge batches (same size as an output + // batch) and one output batch. Again, double to allow for power-of-two + // allocation and add one for a margin of error. + + long minMergeMemory = (2*3 + 1) * estimatedOutputBatchSize; + + // Determine how much memory can be used to hold in-memory batches of spilled + // runs when reading from disk. + + mergeMemoryPool = Math.max(minMergeMemory, + (long) ((memoryLimit - 3 * estimatedOutputBatchSize) * 0.95)); + + // Sanity check: if we've been given too little memory to make progress, + // issue a warning but proceed anyway. Should only occur if something is + // configured terribly wrong. + + long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory); + if (minMemoryNeeds > memoryLimit) { + logger.warn("updateMemoryEstimates: potential memory overflow! " + + "Minumum needed = {} bytes, actual available = {} bytes", + minMemoryNeeds, memoryLimit); + } + + // Log the calculated values. Turn this on if things seem amiss. + // Message will appear only when the values change. + + logger.debug("Memory Estimates: record size = {} bytes; input batch = {} bytes, {} records; " + + "output batch size = {} bytes, {} records; " + + "Available memory: {}, spill point = {}, min. merge memory = {}", + estimatedRecordSize, estimatedInputBatchSize, estimatedInputRecordCount, + estimatedOutputBatchSize, outputBatchRecordCount, + memoryLimit, spillPoint, minMergeMemory); + } + + /** + * Determine if spill is needed before receiving the new record batch. + * Spilling is driven purely by memory availability (and an optional + * batch limit for testing.) + * + * @return true if spilling is needed, false otherwise + */ + + private boolean isSpillNeeded() { + + // Can't spill if less than two batches else the merge + // can't make progress. + + if (bufferedBatches.size() < 2) { + return false; } + + // Must spill if we are below the spill point (the amount of memory + // needed to do the minimal spill.) + + long freeMemory = memoryLimit - allocator.getAllocatedMemory(); + + // Sanity check. If the memory goes negative, the calcs are off. + + if (freeMemory < 0) { + logger.error("ERROR: Free memory is negative: {}. Spill point = {}", + freeMemory, spillPoint); + } + if (freeMemory <= spillPoint) { + return true; } + + // Number of incoming batches (BatchGroups) exceed the limit and number of incoming + // batches accumulated since the last spill exceed the defined limit + + return bufferedBatches.size() > bufferedBatchLimit; + } + + /** + * Perform an in-memory sort of the buffered batches. Obviously can + * be used only for the non-spilling case. + * + * @return DONE if no rows, OK_NEW_SCHEMA if at least one row + */ + + private IterOutcome sortInMemory() { + logger.info("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}", + bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory()); + + // Note the difference between how we handle batches here and in the spill/merge + // case. In the spill/merge case, this class decides on the batch size to send + // downstream. However, in the in-memory case, we must pass along all batches + // in a single SV4. Attempts to do paging will result in errors. In the memory + // merge case, the downstream Selection Vector Remover will split the one + // big SV4 into multiple smaller batches to send further downstream. + + // If the sort fails or is empty, clean up here. Otherwise, cleanup is done + // by closing the resultsIterator after all results are returned downstream. + + InMemorySorter memoryMerge = new InMemorySorter(context, allocator, opCodeGen); + try { + sv4 = memoryMerge.sort(bufferedBatches, this, container); + if (sv4 == null) { + sortState = SortState.DONE; + return IterOutcome.STOP; + } else { + logger.info("Completed in-memory sort. Memory = {}", + allocator.getAllocatedMemory()); + resultsIterator = memoryMerge; + memoryMerge = null; + sortState = SortState.DELIVER; + return IterOutcome.OK_NEW_SCHEMA; + } + } finally { + if (memoryMerge != null) { + memoryMerge.close(); + } + } + } + + /** + * Perform merging of (typically spilled) batches. First consolidates batches + * as needed, then performs a final merge that is read one batch at a time + * to deliver batches to the downstream operator. + * + * @return always returns OK_NEW_SCHEMA + */ + + private IterOutcome mergeSpilledRuns() { + logger.info("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}", + inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(), + bufferedBatches.size(), spilledRuns.size()); + + // Consolidate batches to a number that can be merged in + // a single last pass. + + int mergeCount = 0; + while (consolidateBatches()) { + mergeCount++; + } + stats.addLongStat(Metric.MERGE_COUNT, mergeCount); + + // Merge in-memory batches and spilled runs for the final merge. + + List<BatchGroup> allBatches = new LinkedList<>(); + allBatches.addAll(bufferedBatches); + bufferedBatches.clear(); + allBatches.addAll(spilledRuns); + spilledRuns.clear(); + + logger.info("Starting merge phase. Runs = {}, Alloc. memory = {}", allBatches.size(), allocator.getAllocatedMemory()); + + // Do the final merge as a results iterator. + + CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, allBatches, container, outputBatchRecordCount); + merger.next(); + resultsIterator = merger; + sortState = SortState.DELIVER; + return IterOutcome.OK_NEW_SCHEMA; + } + + private boolean consolidateBatches() { + + // Determine additional memory needed to hold one batch from each + // spilled run. + + int inMemCount = bufferedBatches.size(); + int spilledRunsCount = spilledRuns.size(); + + // Can't merge more than will fit into memory at one time. + + int maxMergeWidth = (int) (mergeMemoryPool / estimatedOutputBatchSize); + maxMergeWidth = Math.min(mergeLimit, maxMergeWidth); + + // If we can't fit all batches in memory, must spill any in-memory + // batches to make room for multiple spill-merge-spill cycles. + + if (inMemCount > 0) { + if (spilledRunsCount > maxMergeWidth) { + spillFromMemory(); + return true; + } + + // If we just plain have too many batches to merge, spill some + // in-memory batches to reduce the burden. + + if (inMemCount + spilledRunsCount > mergeLimit) { + spillFromMemory(); + return true; + } + + // If the on-disk batches and in-memory batches need more memory than + // is available, spill some in-memory batches. + + long allocated = allocator.getAllocatedMemory(); + long totalNeeds = spilledRunsCount * estimatedOutputBatchSize + allocated; + if (totalNeeds > mergeMemoryPool) { + spillFromMemory(); + return true; + } + } + + // Merge on-disk batches if we have too many. + + int mergeCount = spilledRunsCount - maxMergeWidth; + if (mergeCount <= 0) { + return false; + } + + // We will merge. This will create yet another spilled + // run. Account for that. + + mergeCount += 1; + + // Must merge at least 2 batches to make progress. + + mergeCount = Math.max(2, mergeCount); + + mergeCount = Math.min(mergeCount, maxMergeWidth); + + // Do the merge, then loop to try again in case not + // all the target batches spilled in one go. + + logger.trace("Merging {} on-disk runs, Alloc. memory = {}", + mergeCount, allocator.getAllocatedMemory()); + mergeAndSpill(spilledRuns, mergeCount); + return true; + } + + /** + * This operator has accumulated a set of sorted incoming record batches. + * We wish to spill some of them to disk. To do this, a "copier" + * merges the target batches to produce a stream of new (merged) batches + * which are then written to disk. + * <p> + * This method spills only half the accumulated batches + * minimizing unnecessary disk writes. The exact count must lie between + * the minimum and maximum spill counts. + */ + + private void spillFromMemory() { + int spillCount; + if (spillFileSize == 0) { + // If no spill limit given, guess half the batches. + + spillCount = bufferedBatches.size() / 2; --- End diff -- Actually, this line is here only as a fall back. Notice the surrounding if. We are, in fact, driven by the spill file size. Changed this to set a minimum spill file size in setup, then eliminate this line here. > Create a memory-managed version of the External Sort operator > ------------------------------------------------------------- > > Key: DRILL-5080 > URL: https://issues.apache.org/jira/browse/DRILL-5080 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.8.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Fix For: 1.10.0 > > Attachments: ManagedExternalSortDesign.pdf > > > We propose to create a "managed" version of the external sort operator that > works to a clearly-defined memory limit. Attached is a design specification > for the work. > The project will include fixing a number of bugs related to the external > sort, include as sub-tasks of this umbrella task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)