[ 
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847977#comment-15847977
 ] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r98594425
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import com.google.common.base.Stopwatch;
    +
    +/**
    + * Manages a {@link PriorityQueueCopier} instance produced from code 
generation.
    + * Provides a wrapper around a copier "session" to simplify reading batches
    + * from the copier.
    + */
    +
    +public class CopierHolder {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
    +
    +  private PriorityQueueCopier copier;
    +
    +  private final FragmentContext context;
    +  private final BufferAllocator allocator;
    +  private OperatorCodeGenerator opCodeGen;
    +
    +  public CopierHolder(FragmentContext context, BufferAllocator allocator, 
OperatorCodeGenerator opCodeGen) {
    +    this.context = context;
    +    this.allocator = allocator;
    +    this.opCodeGen = opCodeGen;
    +  }
    +
    +  /**
    +   * Start a merge operation using a temporary vector container. Used for
    +   * intermediate merges.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param targetRecordCount
    +   * @return
    +   */
    +
    +  public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Start a merge operation using the specified vector container. Used for
    +   * the final merge operation.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param outputContainer
    +   * @param targetRecordCount
    +   * @return
    +   */
    +  public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, 
List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int 
targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, outputContainer, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Prepare a copier which will write a collection of vectors to disk. 
The copier
    +   * uses generated code to to the actual writes. If the copier has not 
yet been
    +   * created, generated code and create it. If it has been created, close 
it and
    +   * prepare it for a new collection of batches.
    +   *
    +   * @param batch the (hyper) batch of vectors to be copied
    +   * @param batchGroupList same batches as above, but represented as a list
    +   * of individual batches
    +   * @param outputContainer the container into which to copy the batches
    +   * @param allocator allocator to use to allocate memory in the operation
    +   */
    +
    +  @SuppressWarnings("unchecked")
    +  private void createCopier(VectorAccessible batch, List<? extends 
BatchGroup> batchGroupList, VectorContainer outputContainer) {
    +    if (copier != null) {
    +      opCodeGen.closeCopier();
    +    } else {
    +      copier = opCodeGen.getCopier(batch);
    +    }
    +
    +    // Initialize the value vectors for the output container using the
    +    // allocator provided
    +
    +    for (VectorWrapper<?> i : batch) {
    +      @SuppressWarnings("resource")
    +      ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
    +      outputContainer.add(v);
    +    }
    +    try {
    +      copier.setup(context, allocator, batch, (List<BatchGroup>) 
batchGroupList, outputContainer);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change - likely code error.")
    +            .build(logger);
    +    }
    +  }
    +
    +  public BufferAllocator getAllocator() { return allocator; }
    +
    +  public void close() {
    +    opCodeGen.closeCopier();
    +    copier = null;
    +  }
    +
    +  /**
    +   * We've gathered a set of batches, each of which has been sorted. The 
batches
    +   * may have passed through a filter and thus may have "holes" where rows 
have
    +   * been filtered out. We will spill records in blocks of 
targetRecordCount.
    +   * To prepare, copy that many records into an outputContainer as a set of
    +   * contiguous values in new vectors. The result is a single batch with
    +   * vectors that combine a collection of input batches up to the
    +   * given threshold.
    +   * <p>
    +   * Input (selection vector, data vector):<pre>
    +   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
    +   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
    +   * <p>
    +   * Output (assuming blocks of 5 records, data vectors only):<pre>
    +   * [abcde] [fhikm] [npqr]</pre>
    +   * <p>
    +   * The copying operation does a merge as well: copying
    +   * values from the sources in ordered fashion.
    +   * <pre>
    +   * Input:  [aceg] [bdfh]
    +   * Output: [abcdefgh]</pre>
    +   * <p>
    +   * Here we bind the copier to the batchGroupList of sorted, buffered 
batches
    +   * to be merged. We bind the copier output to outputContainer: the 
copier will write its
    +   * merged "batches" of records to that container.
    +   * <p>
    +   * Calls to the {@link #next()} method sequentially return merged batches
    +   * of the desired row count.
    +    */
    +
    +  public static class BatchMerger implements SortResults, AutoCloseable {
    +
    +    private CopierHolder holder;
    +    private VectorContainer hyperBatch;
    +    private VectorContainer outputContainer;
    +    private int targetRecordCount;
    +    private int copyCount;
    +    private int batchCount;
    +
    +    /**
    +     * Creates a merger with an temporary output container.
    +     *
    +     * @param holder
    +     * @param batchGroupList
    +     * @param targetRecordCount
    +     */
    +    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +      this(holder, schema, batchGroupList, new VectorContainer(), 
targetRecordCount);
    +    }
    +
    +    /**
    +     * Creates a merger with the specified output container
    +     *
    +     * @param holder
    +     * @param batchGroupList
    +     * @param outputContainer
    +     * @param targetRecordCount
    +     */
    +    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, VectorContainer outputContainer, int 
targetRecordCount) {
    +      this.holder = holder;
    +      hyperBatch = constructHyperBatch(schema, batchGroupList);
    +      copyCount = 0;
    +      this.targetRecordCount = targetRecordCount;
    +      this.outputContainer = outputContainer;
    +      holder.createCopier(hyperBatch, batchGroupList, outputContainer);
    +    }
    +
    +    /**
    +     * Return the output container.
    +     *
    +     * @return
    +     */
    +    public VectorContainer getOutput() {
    +      return outputContainer;
    +    }
    +
    +    /**
    +     * Read the next merged batch. The batch holds the specified row 
count, but
    +     * may be less if this is the last batch.
    +     *
    +     * @return the number of rows in the batch, or 0 if no more batches
    +     * are available
    +     */
    +
    +    @Override
    +    public boolean next() {
    +      Stopwatch w = Stopwatch.createStarted();
    +      int count = holder.copier.next(targetRecordCount);
    +      copyCount += count;
    +      if (count > 0) {
    +        long t = w.elapsed(TimeUnit.MICROSECONDS);
    +        logger.trace("Took {} us to merge {} records", t, count);
    +      } else {
    +        logger.trace("copier returned 0 records");
    +      }
    +      batchCount++;
    +
    +      // Identify the schema to be used in the output container. (Since
    +      // all merged batches have the same schema, the schema we identify
    +      // here should be the same as that which we already had.
    +
    +      outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +      // The copier does not set the record count in the output
    +      // container, so do that here.
    +
    +      outputContainer.setRecordCount(count);
    +
    +      return count > 0;
    +    }
    +
    +    /**
    +     * Construct a vector container that holds a list of batches, each 
represented as an
    +     * array of vectors. The entire collection of vectors has a common 
schema.
    +     * <p>
    +     * To build the collection, we go through the current schema (which 
has been
    +     * devised to be common for all batches.) For each field in the 
schema, we create
    +     * an array of vectors. To create the elements, we iterate over all 
the incoming
    +     * batches and search for the vector that matches the current column.
    +     * <p>
    +     * Finally, we build a new schema for the combined container. That new 
schema must,
    +     * because of the way the container was created, match the current 
schema.
    +     *
    +     * @param batchGroupList list of batches to combine
    --- End diff --
    
    Third time "schema" is missing ....


> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that 
> works to a clearly-defined memory limit. Attached is a design specification 
> for the work.
> The project will include fixing a number of bugs related to the external 
> sort, include as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to