Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/984#discussion_r144945381 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java --- @@ -20,22 +20,58 @@ import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface PriorityQueue { - public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException; - public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; - public void generate() throws SchemaChangeException; - public VectorContainer getHyperBatch(); - public SelectionVector4 getHeapSv4(); - public SelectionVector4 getFinalSv4(); - public boolean validate(); - public void resetQueue(VectorContainer container, SelectionVector4 vector4) throws SchemaChangeException; - public void cleanup(); - - public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, PriorityQueueTemplate.class); + /** + * The elements in the given batch are added to the priority queue. Note that the priority queue + * only retains the top elements that fit within the size specified by the {@link #init(int, BufferAllocator, boolean)} + * method. + * @param batch The batch containing elements we want to add. + * @throws SchemaChangeException + */ + void add(RecordBatchData batch) throws SchemaChangeException; + /** + * Initializes the priority queue. This method must be called before any other methods on the priority + * queue are called. + * @param limit The size of the priority queue. + * @param allocator The {@link BufferAllocator} to use when creating the priority queue. + * @param hasSv2 True when incoming batches have v2 selection vectors. False otherwise. + * @throws SchemaChangeException + */ + void init(int limit, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; + + /** + * This method must be called before fetching the final heap hyper batch and final Sv4 vector. + * @throws SchemaChangeException + */ + void generate() throws SchemaChangeException; + + /** + * Retrieves the final priority queue HyperBatch containing the results. <b>Note:</b> this should be called + * after {@link #generate()}. + * @return The final priority queue HyperBatch containing the results. + */ + VectorContainer getHyperBatch(); + + SelectionVector4 getSv4(); + + /** + * Retrieves the selection vector used to select the elements in the priority queue from the hyper batch + * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this should be called after {@link #generate()}. + * @return The selection vector used to select the elements in the priority queue. + */ + SelectionVector4 getFinalSv4(); --- End diff -- Taking a step back... Suppose I do a top 100. Also, suppose my batches are big, say 100 MB each. Also, suppose I have fiendishly organized by data so that the top 100 values are the first values of 100 batches. If the priority queue uses an SV4, then it means that the queue holds onto the entire batch that holds the set of top values. In my case, since the top 100 values occur across 100 batches, I'm holding onto 100 of batches of 100 MB each for a total of 10 GB of memory. Is this how it works? Do we need to think about compaction at some point? Once we have buffered, say, five batches, we copy the values to a new, much smaller, batch and discard the inputs. We repeat this over and over to keep the number of buffered batches under control. Once we control batch size (in bytes; we already control records), we'll have the means to set a memory budget for TopN, then use consolidation to stay within that budget. Or, is this how the code already works?
---