Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/984#discussion_r144945381
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
 ---
    @@ -20,22 +20,58 @@
     import org.apache.drill.exec.compile.TemplateClassDefinition;
     import org.apache.drill.exec.exception.SchemaChangeException;
     import org.apache.drill.exec.memory.BufferAllocator;
    -import org.apache.drill.exec.ops.FragmentContext;
     import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
     import org.apache.drill.exec.record.VectorContainer;
     import org.apache.drill.exec.record.selection.SelectionVector4;
     
     public interface PriorityQueue {
    -  public void add(FragmentContext context, RecordBatchData batch) throws 
SchemaChangeException;
    -  public void init(int limit, FragmentContext context, BufferAllocator 
allocator, boolean hasSv2) throws SchemaChangeException;
    -  public void generate() throws SchemaChangeException;
    -  public VectorContainer getHyperBatch();
    -  public SelectionVector4 getHeapSv4();
    -  public SelectionVector4 getFinalSv4();
    -  public boolean validate();
    -  public void resetQueue(VectorContainer container, SelectionVector4 
vector4) throws SchemaChangeException;
    -  public void cleanup();
    -
    -  public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION 
= new TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, 
PriorityQueueTemplate.class);
    +  /**
    +   * The elements in the given batch are added to the priority queue. Note 
that the priority queue
    +   * only retains the top elements that fit within the size specified by 
the {@link #init(int, BufferAllocator, boolean)}
    +   * method.
    +   * @param batch The batch containing elements we want to add.
    +   * @throws SchemaChangeException
    +   */
    +  void add(RecordBatchData batch) throws SchemaChangeException;
     
    +  /**
    +   * Initializes the priority queue. This method must be called before any 
other methods on the priority
    +   * queue are called.
    +   * @param limit The size of the priority queue.
    +   * @param allocator The {@link BufferAllocator} to use when creating the 
priority queue.
    +   * @param hasSv2 True when incoming batches have v2 selection vectors. 
False otherwise.
    +   * @throws SchemaChangeException
    +   */
    +  void init(int limit, BufferAllocator allocator, boolean hasSv2) throws 
SchemaChangeException;
    +
    +  /**
    +   * This method must be called before fetching the final heap hyper batch 
and final Sv4 vector.
    +   * @throws SchemaChangeException
    +   */
    +  void generate() throws SchemaChangeException;
    +
    +  /**
    +   * Retrieves the final priority queue HyperBatch containing the results. 
<b>Note:</b> this should be called
    +   * after {@link #generate()}.
    +   * @return The final priority queue HyperBatch containing the results.
    +   */
    +  VectorContainer getHyperBatch();
    +
    +  SelectionVector4 getSv4();
    +
    +  /**
    +   * Retrieves the selection vector used to select the elements in the 
priority queue from the hyper batch
    +   * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this 
should be called after {@link #generate()}.
    +   * @return The selection vector used to select the elements in the 
priority queue.
    +   */
    +  SelectionVector4 getFinalSv4();
    --- End diff --
    
    Taking a step back... Suppose I do a top 100. Also, suppose my batches are 
big, say 100 MB each. Also, suppose I have fiendishly organized by data so that 
the top 100 values are the first values of 100 batches.
    
    If the priority queue uses an SV4, then it means that the queue holds onto 
the entire batch that holds the set of top values. In my case, since the top 
100 values occur across 100 batches, I'm holding onto 100 of batches of 100 MB 
each for a total of 10 GB of memory.
    
    Is this how it works?
    
    Do we need to think about compaction at some point? Once we have buffered, 
say, five batches, we copy the values to a new, much smaller, batch and discard 
the inputs. We repeat this over and over to keep the number of buffered batches 
under control.
    
    Once we control batch size (in bytes; we already control records), we'll 
have the means to set a memory budget for TopN, then use consolidation to stay 
within that budget.
    
    Or, is this how the code already works?


---

Reply via email to