[ 
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847967#comment-15847967
 ] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r98593811
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import com.google.common.base.Stopwatch;
    +
    +/**
    + * Manages a {@link PriorityQueueCopier} instance produced from code 
generation.
    + * Provides a wrapper around a copier "session" to simplify reading batches
    + * from the copier.
    + */
    +
    +public class CopierHolder {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
    +
    +  private PriorityQueueCopier copier;
    +
    +  private final FragmentContext context;
    +  private final BufferAllocator allocator;
    +  private OperatorCodeGenerator opCodeGen;
    +
    +  public CopierHolder(FragmentContext context, BufferAllocator allocator, 
OperatorCodeGenerator opCodeGen) {
    +    this.context = context;
    +    this.allocator = allocator;
    +    this.opCodeGen = opCodeGen;
    +  }
    +
    +  /**
    +   * Start a merge operation using a temporary vector container. Used for
    +   * intermediate merges.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param targetRecordCount
    +   * @return
    +   */
    +
    +  public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Start a merge operation using the specified vector container. Used for
    +   * the final merge operation.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param outputContainer
    +   * @param targetRecordCount
    +   * @return
    +   */
    +  public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, 
List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int 
targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, outputContainer, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Prepare a copier which will write a collection of vectors to disk. 
The copier
    +   * uses generated code to to the actual writes. If the copier has not 
yet been
    +   * created, generated code and create it. If it has been created, close 
it and
    +   * prepare it for a new collection of batches.
    +   *
    +   * @param batch the (hyper) batch of vectors to be copied
    +   * @param batchGroupList same batches as above, but represented as a list
    +   * of individual batches
    +   * @param outputContainer the container into which to copy the batches
    +   * @param allocator allocator to use to allocate memory in the operation
    +   */
    +
    +  @SuppressWarnings("unchecked")
    +  private void createCopier(VectorAccessible batch, List<? extends 
BatchGroup> batchGroupList, VectorContainer outputContainer) {
    +    if (copier != null) {
    +      opCodeGen.closeCopier();
    +    } else {
    +      copier = opCodeGen.getCopier(batch);
    +    }
    +
    +    // Initialize the value vectors for the output container using the
    +    // allocator provided
    +
    +    for (VectorWrapper<?> i : batch) {
    +      @SuppressWarnings("resource")
    +      ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
    +      outputContainer.add(v);
    +    }
    +    try {
    +      copier.setup(context, allocator, batch, (List<BatchGroup>) 
batchGroupList, outputContainer);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change - likely code error.")
    +            .build(logger);
    +    }
    +  }
    +
    +  public BufferAllocator getAllocator() { return allocator; }
    +
    +  public void close() {
    +    opCodeGen.closeCopier();
    +    copier = null;
    +  }
    +
    +  /**
    +   * We've gathered a set of batches, each of which has been sorted. The 
batches
    +   * may have passed through a filter and thus may have "holes" where rows 
have
    +   * been filtered out. We will spill records in blocks of 
targetRecordCount.
    +   * To prepare, copy that many records into an outputContainer as a set of
    +   * contiguous values in new vectors. The result is a single batch with
    +   * vectors that combine a collection of input batches up to the
    +   * given threshold.
    +   * <p>
    +   * Input (selection vector, data vector):<pre>
    +   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
    +   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
    +   * <p>
    +   * Output (assuming blocks of 5 records, data vectors only):<pre>
    +   * [abcde] [fhikm] [npqr]</pre>
    +   * <p>
    +   * The copying operation does a merge as well: copying
    +   * values from the sources in ordered fashion.
    +   * <pre>
    +   * Input:  [aceg] [bdfh]
    +   * Output: [abcdefgh]</pre>
    +   * <p>
    +   * Here we bind the copier to the batchGroupList of sorted, buffered 
batches
    +   * to be merged. We bind the copier output to outputContainer: the 
copier will write its
    +   * merged "batches" of records to that container.
    +   * <p>
    +   * Calls to the {@link #next()} method sequentially return merged batches
    +   * of the desired row count.
    +    */
    +
    +  public static class BatchMerger implements SortResults, AutoCloseable {
    +
    +    private CopierHolder holder;
    +    private VectorContainer hyperBatch;
    +    private VectorContainer outputContainer;
    +    private int targetRecordCount;
    +    private int copyCount;
    +    private int batchCount;
    +
    +    /**
    +     * Creates a merger with an temporary output container.
    +     *
    +     * @param holder
    +     * @param batchGroupList
    +     * @param targetRecordCount
    +     */
    +    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +      this(holder, schema, batchGroupList, new VectorContainer(), 
targetRecordCount);
    +    }
    +
    +    /**
    +     * Creates a merger with the specified output container
    +     *
    +     * @param holder
    +     * @param batchGroupList
    --- End diff --
    
    ditto ...


> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that 
> works to a clearly-defined memory limit. Attached is a design specification 
> for the work.
> The project will include fixing a number of bugs related to the external 
> sort, include as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to