Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/984#discussion_r145574026
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
 ---
    @@ -20,22 +20,58 @@
     import org.apache.drill.exec.compile.TemplateClassDefinition;
     import org.apache.drill.exec.exception.SchemaChangeException;
     import org.apache.drill.exec.memory.BufferAllocator;
    -import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
     import org.apache.drill.exec.record.VectorContainer;
     import org.apache.drill.exec.record.selection.SelectionVector4;
     
     public interface PriorityQueue {
    -  public void add(FragmentContext context, RecordBatchData batch) throws 
SchemaChangeException;
    -  public void init(int limit, FragmentContext context, BufferAllocator 
allocator, boolean hasSv2) throws SchemaChangeException;
    -  public void generate() throws SchemaChangeException;
    -  public VectorContainer getHyperBatch();
    -  public SelectionVector4 getHeapSv4();
    -  public SelectionVector4 getFinalSv4();
    -  public boolean validate();
    -  public void resetQueue(VectorContainer container, SelectionVector4 
vector4) throws SchemaChangeException;
    -  public void cleanup();
    -
    -  public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION 
= new TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, 
PriorityQueueTemplate.class);
    +  /**
    +   * The elements in the given batch are added to the priority queue. Note 
that the priority queue
    +   * only retains the top elements that fit within the size specified by 
the {@link #init(int, BufferAllocator, boolean)}
    +   * method.
    +   * @param batch The batch containing elements we want to add.
    +   * @throws SchemaChangeException
    +   */
    +  void add(RecordBatchData batch) throws SchemaChangeException;
     
    +  /**
    +   * Initializes the priority queue. This method must be called before any 
other methods on the priority
    +   * queue are called.
    +   * @param limit The size of the priority queue.
    +   * @param allocator The {@link BufferAllocator} to use when creating the 
priority queue.
    +   * @param hasSv2 True when incoming batches have v2 selection vectors. 
False otherwise.
    +   * @throws SchemaChangeException
    +   */
    +  void init(int limit, BufferAllocator allocator, boolean hasSv2) throws 
SchemaChangeException;
    +
    +  /**
    +   * This method must be called before fetching the final heap hyper batch 
and final Sv4 vector.
    +   * @throws SchemaChangeException
    +   */
    +  void generate() throws SchemaChangeException;
    +
    +  /**
    +   * Retrieves the final priority queue HyperBatch containing the results. 
<b>Note:</b> this should be called
    +   * after {@link #generate()}.
    +   * @return The final priority queue HyperBatch containing the results.
    +   */
    +  VectorContainer getHyperBatch();
    +
    +  SelectionVector4 getSv4();
    +
    +  /**
    +   * Retrieves the selection vector used to select the elements in the 
priority queue from the hyper batch
    +   * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this 
should be called after {@link #generate()}.
    +   * @return The selection vector used to select the elements in the 
priority queue.
    +   */
    +  SelectionVector4 getFinalSv4();
    --- End diff --
    
    The code already works that way. There is a config option 
<b>drill.exec.sort.purge.threshold</b> which controls the maximum number of 
batches allowed in the hyper batch. Once the threshold is exceeded the top N 
values are consolidated into a single batch and the process is repeated. There 
is an issue in the case where the limit is large. Ex. 100,000,000 . In this 
case the operator will keep all the records in memory and die. There is a jira 
created to address this issue: 
[https://issues.apache.org/jira/browse/DRILL-5823]


---

Reply via email to