[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030505#comment-16030505
 ] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r119258161
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -204,24 +293,157 @@ private int getNumPendingOutput() {
     
         @RuntimeOverridden
         public void setupInterior(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") RecordBatch outgoing,
    -        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) 
{
    +        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) 
throws SchemaChangeException {
         }
     
         @RuntimeOverridden
    -    public void updateAggrValuesInternal(@Named("incomingRowIdx") int 
incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
    +    public void updateAggrValuesInternal(@Named("incomingRowIdx") int 
incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
         }
     
         @RuntimeOverridden
    -    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, 
@Named("outRowIdx") int outRowIdx) {
    +    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, 
@Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
         }
       }
     
    +  /**
    +   * An internal class to replace "incoming" - instead scanning a spilled 
partition file
    +   */
    +  public class SpilledRecordbatch implements CloseableRecordBatch {
    +    private VectorContainer container = null;
    +    private InputStream spillStream;
    +    private int spilledBatches;
    +    private FragmentContext context;
    +    private BatchSchema schema;
    +    private OperatorContext oContext;
    +    // Path spillStreamPath;
    +    private String spillFile;
    +    VectorAccessibleSerializable vas;
    +
    +    public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ 
int spilledBatches, FragmentContext context, BatchSchema schema, 
OperatorContext oContext) {
    +      this.context = context;
    +      this.schema = schema;
    +      this.spilledBatches = spilledBatches;
    +      this.oContext = oContext;
    +      //this.spillStreamPath = spillStreamPath;
    +      this.spillFile = spillFile;
    +      vas = new VectorAccessibleSerializable(allocator);
    +      container = vas.get();
    +
    +      try {
    +        this.spillStream = spillSet.openForInput(spillFile);
    +      } catch (IOException e) { throw new RuntimeException(e);}
    +
    +      next(); // initialize the container
    +    }
    +
    +    @Override
    +    public SelectionVector2 getSelectionVector2() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public SelectionVector4 getSelectionVector4() {
    +      throw new UnsupportedOperationException();
    +    }
    +
    +    @Override
    +    public TypedFieldId getValueVectorId(SchemaPath path) {
    +      return container.getValueVectorId(path);
    +    }
    +
    +    @Override
    +    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
ids) {
    +      return container.getValueAccessorById(clazz, ids);
    +    }
    +
    +    @Override
    +    public Iterator<VectorWrapper<?>> iterator() {
    +      return container.iterator();
    +    }
    +
    +    @Override
    +    public FragmentContext getContext() { return context; }
    +
    +    @Override
    +    public BatchSchema getSchema() { return schema; }
    +
    +    @Override
    +    public WritableBatch getWritableBatch() {
    +      return WritableBatch.get(this);
    +    }
    +
    +    @Override
    +    public VectorContainer getOutgoingContainer() { return container; }
    +
    +    @Override
    +    public int getRecordCount() { return container.getRecordCount(); }
    +
    +    @Override
    +    public void kill(boolean sendUpstream) {
    +      this.close(); // delete the current spill file
    +    }
    +
    +    /**
    +     * Read the next batch from the spill file
    +     *
    +     * @return IterOutcome
    +     */
    +    @Override
    +    public IterOutcome next() {
    --- End diff --
    
    HashAgg is unique in the way it reads (and processes) the spilled batches 
exactly like reading (and processing) the incoming batches.  Its actual code 
footprint is quite small (mostly in the next() method).
      SpilledRun extends BatchGroup, and seems to have more logic. The two seem 
to have too many differences to bother in combining them.... 


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to