Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/984#discussion_r145574026 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java --- @@ -20,22 +20,58 @@ import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface PriorityQueue { - public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException; - public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; - public void generate() throws SchemaChangeException; - public VectorContainer getHyperBatch(); - public SelectionVector4 getHeapSv4(); - public SelectionVector4 getFinalSv4(); - public boolean validate(); - public void resetQueue(VectorContainer container, SelectionVector4 vector4) throws SchemaChangeException; - public void cleanup(); - - public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, PriorityQueueTemplate.class); + /** + * The elements in the given batch are added to the priority queue. Note that the priority queue + * only retains the top elements that fit within the size specified by the {@link #init(int, BufferAllocator, boolean)} + * method. + * @param batch The batch containing elements we want to add. + * @throws SchemaChangeException + */ + void add(RecordBatchData batch) throws SchemaChangeException; + /** + * Initializes the priority queue. This method must be called before any other methods on the priority + * queue are called. + * @param limit The size of the priority queue. + * @param allocator The {@link BufferAllocator} to use when creating the priority queue. + * @param hasSv2 True when incoming batches have v2 selection vectors. False otherwise. + * @throws SchemaChangeException + */ + void init(int limit, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; + + /** + * This method must be called before fetching the final heap hyper batch and final Sv4 vector. + * @throws SchemaChangeException + */ + void generate() throws SchemaChangeException; + + /** + * Retrieves the final priority queue HyperBatch containing the results. <b>Note:</b> this should be called + * after {@link #generate()}. + * @return The final priority queue HyperBatch containing the results. + */ + VectorContainer getHyperBatch(); + + SelectionVector4 getSv4(); + + /** + * Retrieves the selection vector used to select the elements in the priority queue from the hyper batch + * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this should be called after {@link #generate()}. + * @return The selection vector used to select the elements in the priority queue. + */ + SelectionVector4 getFinalSv4(); --- End diff -- The code already works that way. There is a config option <b>drill.exec.sort.purge.threshold</b> which controls the maximum number of batches allowed in the hyper batch. Once the threshold is exceeded the top N values are consolidated into a single batch and the process is repeated. There is an issue in the case where the limit is large. Ex. 100,000,000 . In this case the operator will keep all the records in memory and die. There is a jira created to address this issue: [https://issues.apache.org/jira/browse/DRILL-5823]
---