[ https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850642#comment-15850642 ]
ASF GitHub Bot commented on DRILL-5080: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/717#discussion_r99040069 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java --- @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.SchemaUtil; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +import com.google.common.base.Stopwatch; + +/** + * Represents a group of batches spilled to disk. + * <p> + * The batches are defined by a schema which can change over time. When the schema changes, + * all existing and new batches are coerced into the new schema. Provides a + * uniform way to iterate over records for one or more batches whether + * the batches are in memory or on disk. + * <p> + * The <code>BatchGroup</code> operates in two modes as given by the two + * subclasses: + * <ul> + * <li>Input mode (@link InputBatchGroup): Used to buffer in-memory batches + * prior to spilling.</li> + * <li>Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set + * of batches written to disk. Acts as both a reader and writer for + * those batches.</li> + */ + +public abstract class BatchGroup implements VectorAccessible, AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class); + + /** + * The input batch group gathers batches buffered in memory before + * spilling. The structure of the data is: + * <ul> + * <li>Contains a single batch received from the upstream (input) + * operator.</li> + * <li>Associated selection vector that provides a sorted + * indirection to the values in the batch.</li> + * </ul> + */ + + public static class InputBatch extends BatchGroup { + private SelectionVector2 sv2; + + public InputBatch(VectorContainer container, SelectionVector2 sv2, OperatorContext context, long batchSize) { + super(container, context, batchSize); + this.sv2 = sv2; + } + + public SelectionVector2 getSv2() { + return sv2; + } + + @Override + public int getRecordCount() { + if (sv2 != null) { + return sv2.getCount(); + } else { + return super.getRecordCount(); + } + } + + @Override + public int getNextIndex() { + int val = super.getNextIndex(); + if (val == -1) { + return val; + } + return sv2.getIndex(val); + } + + @Override + public void close() throws IOException { + try { + super.close(); + } + finally { + if (sv2 != null) { + sv2.clear(); + } + } + } + } + + /** + * Holds a set of spilled batches, represented by a file on disk. + * Handles reads from, and writes to the spill file. The data structure + * is: + * <ul> + * <li>A pointer to a file that contains serialized batches.</li> + * <li>When writing, each batch is appended to the output file.</li> + * <li>When reading, iterates over each spilled batch, and for each + * of those, each spilled record.</li> + * </ul> + * <p> + * Starts out with no current batch. Defines the current batch to be the + * (shell: schema without data) of the last batch spilled to disk. + * <p> + * When reading, has destructive read-once behavior: closing the + * batch (after reading) deletes the underlying spill file. + * <p> + * This single class does three tasks: load data, hold data and + * read data. This should be split into three separate classes. But, + * the original (combined) structure is retained for expedience at + * present. + */ + + public static class SpilledRun extends BatchGroup { + private InputStream inputStream; + private OutputStream outputStream; + private String path; + private SpillSet spillSet; + private BufferAllocator allocator; + private int spilledBatches = 0; + + public SpilledRun(SpillSet spillSet, String path, OperatorContext context, long batchSize) throws IOException { + super(null, context, batchSize); + this.spillSet = spillSet; + this.path = path; + this.allocator = context.getAllocator(); + outputStream = spillSet.openForOutput(path); + } + + public void addBatch(VectorContainer newContainer) throws IOException { + int recordCount = newContainer.getRecordCount(); + @SuppressWarnings("resource") + WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, newContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + outputBatch.writeToStream(outputStream); + newContainer.zeroVectors(); + logger.trace("Wrote {} records in {} us", recordCount, watch.elapsed(TimeUnit.MICROSECONDS)); + spilledBatches++; + + // Hold onto the husk of the last added container so that we have a + // current container when starting to read rows back later. + + currentContainer = newContainer; + currentContainer.setRecordCount(0); + } + + @Override + public int getNextIndex() { + if (pointer == getRecordCount()) { + if (spilledBatches == 0) { + return -1; + } + try { + currentContainer.zeroVectors(); + getBatch(); + } catch (IOException e) { + // Release any partially-loaded data. + currentContainer.clear(); + throw UserException.dataReadError(e) + .message("Failure while reading spilled data") + .build(logger); + } + pointer = 1; + return 0; + } + return super.getNextIndex(); + } + + private VectorContainer getBatch() throws IOException { + if (inputStream == null) { + inputStream = spillSet.openForInput(path); + } + VectorAccessibleSerializable vas = new VectorAccessibleSerializable(allocator); + Stopwatch watch = Stopwatch.createStarted(); + vas.readFromStream(inputStream); + VectorContainer c = vas.get(); + if (schema != null) { + c = SchemaUtil.coerceContainer(c, schema, context); + } + logger.trace("Read {} records in {} us", c.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS)); + spilledBatches--; + currentContainer.zeroVectors(); + Iterator<VectorWrapper<?>> wrapperIterator = c.iterator(); + for (@SuppressWarnings("rawtypes") VectorWrapper w : currentContainer) { + TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); + pair.transfer(); + } + currentContainer.setRecordCount(c.getRecordCount()); + c.zeroVectors(); + return c; + } + + @Override + public void close() throws IOException { + try { + super.close(); + } + finally { + try { + closeOutputStream(); + } finally { + try { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + logger.trace("Summary: Read {} bytes from {}", dataSize, path); + } + } + finally { + spillSet.delete(path); + } + } + } + } + + public void closeOutputStream() throws IOException { + if (outputStream != null) { + outputStream.close(); + outputStream = null; + logger.trace("Summary: Wrote {} bytes to {}", dataSize, path); + } + } + } + + protected VectorContainer currentContainer; + protected int pointer = 0; + protected OperatorContext context; + protected BatchSchema schema; + protected long dataSize; + + public BatchGroup(VectorContainer container, OperatorContext context, long dataSize) { + this.currentContainer = container; + this.context = context; + this.dataSize = dataSize; + } + + /** + * Updates the schema for this batch group. The current as well as any + * deserialized batches will be coerced to this schema. + * @param schema + */ + public void setSchema(BatchSchema schema) { + currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context); + this.schema = schema; + } + + public int getNextIndex() { + if (pointer == getRecordCount()) { + return -1; + } + int val = pointer++; + assert val < currentContainer.getRecordCount(); --- End diff -- True. Original code. I suppose it is trying to catch any random increments to pointer elsewhere? Or, forgetting to reset pointer when reading batches from a spill file? > Create a memory-managed version of the External Sort operator > ------------------------------------------------------------- > > Key: DRILL-5080 > URL: https://issues.apache.org/jira/browse/DRILL-5080 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.8.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Fix For: 1.10.0 > > Attachments: ManagedExternalSortDesign.pdf > > > We propose to create a "managed" version of the external sort operator that > works to a clearly-defined memory limit. Attached is a design specification > for the work. > The project will include fixing a number of bugs related to the external > sort, include as sub-tasks of this umbrella task. -- This message was sent by Atlassian JIRA (v6.3.15#6346)