[ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847983#comment-15847983 ]
ASF GitHub Bot commented on DRILL-5080: --------------------------------------- Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r98802888 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java --- @@ -0,0 +1,1321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.physical.config.ExternalSort; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.SchemaUtil; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; + +import com.google.common.collect.Lists; + +/** + * External sort batch: a sort batch which can spill to disk in + * order to operate within a defined memory footprint. + * <p> + * <h4>Basic Operation</h4> + * The operator has three key phases: + * <p> + * <ul> + * <li>The load phase in which batches are read from upstream.</li> + * <li>The merge phase in which spilled batches are combined to + * reduce the number of files below the configured limit. (Best + * practice is to configure the system to avoid this phase.) + * <li>The delivery phase in which batches are combined to produce + * the final output.</li> + * </ul> + * During the load phase: + * <p> + * <ul> + * <li>The incoming (upstream) operator provides a series of batches.</li> + * <li>This operator sorts each batch, and accumulates them in an in-memory + * buffer.</li> + * <li>If the in-memory buffer becomes too large, this operator selects + * a subset of the buffered batches to spill.</li> + * <li>Each spill set is merged to create a new, sorted collection of + * batches, and each is spilled to disk.</li> + * <li>To allow the use of multiple disk storage, each spill group is written + * round-robin to a set of spill directories.</li> + * </ul> + * <p> + * During the sort/merge phase: + * <p> + * <ul> + * <li>When the input operator is complete, this operator merges the accumulated + * batches (which may be all in memory or partially on disk), and returns + * them to the output (downstream) operator in chunks of no more than + * 32K records.</li> + * <li>The final merge must combine a collection of in-memory and spilled + * batches. Several limits apply to the maximum "width" of this merge. For + * example, we each open spill run consumes a file handle, and we may wish + * to limit the number of file handles. A consolidation phase combines + * in-memory and spilled batches prior to the final merge to control final + * merge width.</li> + * <li>A special case occurs if no batches were spilled. In this case, the input + * batches are sorted in memory without merging.</li> + * </ul> + * <p> + * Many complex details are involved in doing the above; the details are explained + * in the methods of this class. + * <p> + * <h4>Configuration Options</h4> + * <dl> + * <dt>drill.exec.sort.external.spill.fs</dt> + * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd> + * <dt>drill.exec.sort.external.spill.directories</dt> + * <dd>The (comma? space?) separated list of directories, on the above file + * system, to which to spill files in round-robin fashion. The query will + * fail if any one of the directories becomes full.</dt> + * <dt>drill.exec.sort.external.spill.file_size</dt> + * <dd>Target size for first-generation spill files Set this to large + * enough to get nice long writes, but not so large that spill directories + * are overwhelmed.</dd> + * <dt>drill.exec.sort.external.mem_limit</dt> + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.batch_limit</dt> + * <dd>Maximum number of batches to hold in memory. (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.max_count</dt> + * <dd>Maximum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.spill.min_count</dt> + * <dd>Minimum number of batches to add to “first generation” files. + * Defaults to 0 (no limit). (Primarily for testing.)</dd> + * <dt>drill.exec.sort.external.merge_limit</dt> + * <dd>Sets the maximum number of runs to be merged in a single pass (limits + * the number of open files.)</dd> + * </dl> + * <p> + * The memory limit observed by this operator is the lesser of: + * <ul> + * <li>The maximum allocation allowed the the allocator assigned to this batch, or</li> + * <li>The maximum limit set for this operator by the Foreman.</li> + * <li>The maximum limit configured in the mem_limit parameter above. (Primarily for + * testing.</li> + * </ul> + * <h4>Output</h4> + * It is helpful to note that the sort operator will produce one of two kinds of + * output batches. + * <ul> + * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses + * the entire in-memory sort set. A selection vector remover will copy results + * into new batches of a size determined by that operator.</li> + * <li>A series of batches, without a selection vector, if the sort spills to + * disk. In this case, the downstream operator will still be a selection vector + * remover, but there is nothing for that operator to remove. Each batch is + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li> + * </ul> + * Note that, even in the in-memory sort case, this operator could do the copying + * to eliminate the extra selection vector remover. That is left as an exercise + * for another time. + * <h4>Logging</h4> + * Logging in this operator serves two purposes: + * <li> + * <ul> + * <li>Normal diagnostic information.</li> + * <li>Capturing the essence of the operator functionality for analysis in unit + * tests.</li> + * </ul> + * Test logging is designed to capture key events and timings. Take care + * when changing or removing log messages as you may need to adjust unit tests + * accordingly. + */ + +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class); + protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class); + + private final RecordBatch incoming; + + /** + * Memory allocator for this operator itself. Incoming batches are + * transferred into this allocator. Intermediate batches used during + * merge also reside here. + */ + + private final BufferAllocator allocator; + + /** + * Schema of batches that this operator produces. + */ + + private BatchSchema schema; + + private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList(); + private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList(); + private SelectionVector4 sv4; + + /** + * The number of records to add to each output batch sent to the + * downstream operator or spilled to disk. + */ + + private int outputBatchRecordCount; + private int peakNumBatches = -1; + + /** + * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target + * number of records to return in each batch. + * <p> + * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}. + * <p> + * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill contains a + * design flaw that may give rise to fatal memory fragmentation if we allow it to + * allocate larger vectors. + */ + + private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024; + + /** + * Smallest allowed output batch size. The smallest output batch + * created even under constrained memory conditions. + */ + private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024; + + /** + * The preferred amount of memory to set aside to output batches + * expressed as a ratio of available memory. + */ + + private static final float MERGE_BATCH_ALLOWANCE = 0.10F; + + public static final String INTERRUPTION_AFTER_SORT = "after-sort"; + public static final String INTERRUPTION_AFTER_SETUP = "after-setup"; + public static final String INTERRUPTION_WHILE_SPILLING = "spilling"; + + private long memoryLimit; + + /** + * Iterates over the final, sorted results. + */ + + private SortResults resultsIterator; + + /** + * Manages the set of spill directories and files. + */ + + private final SpillSet spillSet; + + /** + * Manages the copier used to merge a collection of batches into + * a new set of batches. + */ + + private final CopierHolder copierHolder; + + private enum SortState { START, LOAD, DELIVER, DONE } + private SortState sortState = SortState.START; + private int inputRecordCount = 0; + private int inputBatchCount = 0; // total number of batches received so far + private final OperatorCodeGenerator opCodeGen; + + /** + * Estimated size of the records for this query, updated on each + * new batch received from upstream. + */ + + private int estimatedRecordSize; + + /** + * Estimated size of the spill and output batches that this + * operator produces, estimated from the estimated record + * size. + */ + + private long estimatedOutputBatchSize; + private long estimatedInputBatchSize; + + /** + * Maximum number of batches to hold in memory. + * (Primarily for testing.) + */ + + private int bufferedBatchLimit; + private int mergeLimit; + private int minSpillLimit; + private int maxSpillLimit; + private long spillFileSize; + private long minimumBufferSpace; + + /** + * Minimum memory level before spilling occurs. That is, we can buffer input + * batches in memory until we are down to the level given by the spill point. + */ + + private long spillPoint; + private long mergeMemoryPool; + private long preferredMergeBatchSize; + + // WARNING: The enum here is used within this class. But, the members of + // this enum MUST match those in the (unmanaged) ExternalSortBatch since + // that is the enum used in the UI to display metrics for the query profile. + + public enum Metric implements MetricDef { + SPILL_COUNT, // number of times operator spilled to disk + RETIRED1, // Was: peak value for totalSizeInMemory + // But operator already provides this value + PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory + MERGE_COUNT, // Number of second+ generation merges + MIN_BUFFER, // Minimum memory level observed in operation. + INPUT_BATCHES; // Number of batches read from upstream. + + @Override + public int metricId() { + return ordinal(); + } + } + + /** + * Iterates over the final sorted results. Implemented differently + * depending on whether the results are in-memory or spilled to + * disk. + */ + + public interface SortResults { + boolean next(); + void close(); + int getBatchCount(); + int getRecordCount(); + } + + public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) { + super(popConfig, context, true); + this.incoming = incoming; + allocator = oContext.getAllocator(); + opCodeGen = new OperatorCodeGenerator(context, popConfig); + + spillSet = new SpillSet(context, popConfig); + copierHolder = new CopierHolder(context, allocator, opCodeGen); + configure(context.getConfig()); + } + + private void configure(DrillConfig config) { + + // The maximum memory this operator can use. It is either the + // limit set on the allocator or on the operator, whichever is + // less. + + memoryLimit = Math.min(popConfig.getMaxAllocation(), allocator.getLimit()); + + // Optional configured memory limit, typically used only for testing. + + long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY); + if (configLimit > 0) { + memoryLimit = Math.min(memoryLimit, configLimit); + } + + // Optional limit on the number of buffered in-memory batches. + // 0 means no limit. Used primarily for testing. Must allow at least two + // batches or no merging can occur. + + bufferedBatchLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2); + + // Optional limit on the number of spilled runs to merge in a single + // pass. Limits the number of open file handles. Must allow at least + // two batches to merge to make progress. + + mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2); + + // Limits on the minimum and maximum buffered batches to spill per + // spill event. + + minSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE, 2); + maxSpillLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE, 2); + if (minSpillLimit > maxSpillLimit) { + minSpillLimit = Math.min(minSpillLimit, maxSpillLimit); + maxSpillLimit = minSpillLimit; + } + + // Limits the size of first-generation spill files. + + spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE); + + // Set the target output batch size. Use the maximum size, but only if + // this represents less than 10% of available memory. Otherwise, use 10% + // of memory, but no smaller than the minimum size. In any event, an + // output batch can contain no fewer than a single record. + + long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE); + preferredMergeBatchSize = Math.min(maxAllowance, MAX_MERGED_BATCH_SIZE); + preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE); + + logger.debug("Config: memory limit = {}, batch limit = {}, " + + "min, max spill limit: {}, {}, merge limit = {}, merge batch size = {}", + memoryLimit, bufferedBatchLimit, minSpillLimit, maxSpillLimit, mergeLimit, + preferredMergeBatchSize); + } + + private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) { + int limit = config.getInt(paramName); + if (limit > 0) { + limit = Math.max(limit, minValue); + } else { + limit = valueIfZero; + } + return limit; + } + + @Override + public int getRecordCount() { + if (sv4 != null) { + return sv4.getCount(); + } + return container.getRecordCount(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + return sv4; + } + + private void closeBatchGroups(Collection<? extends BatchGroup> groups) { + for (BatchGroup group: groups) { + try { + group.close(); + } catch (Exception e) { + // collect all failure and make sure to cleanup all remaining batches + // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources() + // where it would have been passed to context.fail() + // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping + // right away, this will also make sure we collect any additional exception we may get while cleaning up + context.fail(e); + } + } + } + + @Override + public void close() { + try { + if (bufferedBatches != null) { + closeBatchGroups(bufferedBatches); + bufferedBatches = null; + } + if (spilledRuns != null) { + closeBatchGroups(spilledRuns); + spilledRuns = null; + } + } finally { + if (sv4 != null) { + sv4.clear(); + } + if (resultsIterator != null) { + resultsIterator.close(); + } + copierHolder.close(); + spillSet.close(); + opCodeGen.close(); + + // The call to super.close() clears out the output container. + // Doing so requires the allocator here, so it must be closed + // after the super call. + + super.close(); + allocator.close(); + } + } + + /** + * Called by {@link AbstractRecordBatch} as a fast-path to obtain + * the first record batch and setup the schema of this batch in order + * to quickly return the schema to the client. Note that this method + * fetches the first batch from upstream which will be waiting for + * us the first time that {@link #innerNext()} is called. + */ + + @Override + public void buildSchema() { + IterOutcome outcome = next(incoming); + switch (outcome) { + case OK: + case OK_NEW_SCHEMA: + for (VectorWrapper<?> w : incoming) { + @SuppressWarnings("resource") + ValueVector v = container.addOrGet(w.getField()); + if (v instanceof AbstractContainerVector) { + w.getValueVector().makeTransferPair(v); // Can we remove this hack? + v.clear(); + } + v.allocateNew(); // Can we remove this? - SVR fails with NPE (TODO) + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + break; + case STOP: + state = BatchState.STOP; + break; + case OUT_OF_MEMORY: + state = BatchState.OUT_OF_MEMORY; + break; + case NONE: + state = BatchState.DONE; + break; + default: + break; + } + } + + /** + * Process each request for a batch. The first request retrieves + * the all incoming batches and sorts them, optionally spilling to + * disk as needed. Subsequent calls retrieve the sorted results in + * fixed-size batches. + */ + + @Override + public IterOutcome innerNext() { + switch (sortState) { + case DONE: + return IterOutcome.NONE; + case START: + case LOAD: + return load(); + case DELIVER: + return nextOutputBatch(); + default: + throw new IllegalStateException("Unexpected sort state: " + sortState); + } + } + + private IterOutcome nextOutputBatch() { + if (resultsIterator.next()) { + return IterOutcome.OK; + } else { + logger.trace("Deliver phase complete: Returned {} batches, {} records", + resultsIterator.getBatchCount(), resultsIterator.getRecordCount()); + sortState = SortState.DONE; + return IterOutcome.NONE; + } + } + + /** + * Load and process a single batch, handling schema changes. In general, the + * external sort accepts only one schema. It can handle compatible schemas + * (which seems to mean the same columns in possibly different orders.) + * + * @return return code depending on the amount of data read from upstream + */ + + private IterOutcome loadBatch() { + + // If this is the very first batch, then AbstractRecordBatch + // already loaded it for us in buildSchema(). + + IterOutcome upstream; + if (sortState == SortState.START) { + sortState = SortState.LOAD; + upstream = IterOutcome.OK_NEW_SCHEMA; + } else { + upstream = next(incoming); + } + switch (upstream) { + case NONE: + return upstream; + case NOT_YET: + throw new UnsupportedOperationException(); + case STOP: + return upstream; + case OK_NEW_SCHEMA: + case OK: + setupSchema(upstream); + + // Add the batch to the in-memory generation, spilling if + // needed. + + processBatch(); + break; + case OUT_OF_MEMORY: + + // Note: it is highly doubtful that this code actually works. It + // requires that the upstream batches got to a safe place to run + // out of memory and that no work as in-flight and thus abandoned. + // Consider removing this case once resource management is in place. + + logger.debug("received OUT_OF_MEMORY, trying to spill"); + if (bufferedBatches.size() > 2) { + spillFromMemory(); + } else { + logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream"); + return IterOutcome.OUT_OF_MEMORY; + } + break; + default: + throw new UnsupportedOperationException(); + } + return IterOutcome.OK; + } + + /** + * Load the results and sort them. May bail out early if an exceptional + * condition is passed up from the input batch. + * + * @return return code: OK_NEW_SCHEMA if rows were sorted, + * NONE if no rows + */ + + private IterOutcome load() { + logger.trace("Start of load phase"); + + // Clear the temporary container created by + // buildSchema(). + + container.clear(); + + // Loop over all input batches + + for (;;) { + IterOutcome result = loadBatch(); + + // None means all batches have been read. + + if (result == IterOutcome.NONE) { + break; } + + // Any outcome other than OK means something went wrong. + + if (result != IterOutcome.OK) { + return result; } + } + + // Anything to actually sort? + + if (inputRecordCount == 0) { + sortState = SortState.DONE; + return IterOutcome.NONE; + } + logger.trace("Completed load phase: read {} batches", inputBatchCount); + + // Do the merge of the loaded batches. The merge can be done entirely in memory if + // the results fit; else we have to do a disk-based merge of + // pre-sorted spilled batches. + + if (canUseMemoryMerge()) { + return sortInMemory(); + } else { + return mergeSpilledRuns(); + } + } + + /** + * All data has been read from the upstream batch. Determine if we + * can use a fast in-memory sort, or must use a merge (which typically, + * but not always, involves spilled batches.) + * + * @return whether sufficient resources exist to do an in-memory sort + * if all batches are still in memory + */ + + private boolean canUseMemoryMerge() { + if (spillSet.hasSpilled()) { return false; } + + // Do we have enough memory for MSorter (the in-memory sorter)? + + long allocMem = allocator.getAllocatedMemory(); + long availableMem = memoryLimit - allocMem; + long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount); + if (availableMem < neededForInMemorySort) { return false; } + + // Make sure we don't exceed the maximum number of batches SV4 can address. + + if (bufferedBatches.size() > Character.MAX_VALUE) { return false; } + + // We can do an in-memory merge. + + return true; + } + + /** + * Handle a new schema from upstream. The ESB is quite limited in its ability + * to handle schema changes. + * + * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA + */ + + private void setupSchema(IterOutcome upstream) { + + // First batch: we won't have a schema. + + if (schema == null) { + schema = incoming.getSchema(); + + // Subsequent batches, nothing to do if same schema. + + } else if (upstream == IterOutcome.OK) { + return; + + // Only change in the case that the schema truly changes. Artificial schema changes are ignored. + + } else if (incoming.getSchema().equals(schema)) { + return; + } else if (unionTypeEnabled) { + schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema()); --- End diff -- Would this ever work ? Was this tested ? Can the ESB handle the resulting union type (e.g., if a sort column's type has changed in the incoming schema) ? > Create a memory-managed version of the External Sort operator > ------------------------------------------------------------- > > Key: DRILL-5080 > URL: https://issues.apache.org/jira/browse/DRILL-5080 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.8.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Fix For: 1.10 > > Attachments: ManagedExternalSortDesign.pdf > > > We propose to create a "managed" version of the external sort operator that > works to a clearly-defined memory limit. Attached is a design specification > for the work. > The project will include fixing a number of bugs related to the external > sort, include as sub-tasks of this umbrella task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)