Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/860#discussion_r124430031
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
 ---
    @@ -19,7 +19,125 @@
     
     import com.google.common.annotations.VisibleForTesting;
     
    +/**
    + * Computes the memory needs for input batches, spill batches and merge
    + * batches. The key challenges that this code tries to overcome are:
    + * <ul>
    + * <li>Drill is not designed for the small memory allocations,
    + * but the planner may provide such allocations because the memory per
    + * query is divided among slices (minor fragments) and among buffering
    + * operators, leaving very little per operator.</li>
    + * <li>Drill does not provide the detailed memory information needed to
    + * carefully manage memory in tight constraints.</li>
    + * <li>But, Drill has a death penalty for going over the memory limit.</li>
    + * </ul>
    + * As a result, this class is a bit of a hack: it attempt to consider a
    + * number of ill-defined factors in order to divide up memory use in a
    + * way that prevents OOM errors.
    + * <p>
    + * First, it is necessary to differentiate two concepts:
    + * <ul>
    + * <li>The <i>data size</i> of a batch: the amount of memory needed to hold
    + * the data itself. The data size is constant for any given batch.</li>
    + * <li>The <i>buffer size</i> of the buffers that hold the data. The buffer
    + * size varies wildly depending on how the batch was produced.</li>
    + * </ul>
    + * The three kinds of buffer layouts seen to date include:
    + * <ul>
    + * <li>One buffer per vector component (data, offsets, null flags, etc.)
    + * &ndash; create by readers, project and other operators.</li>
    + * <li>One buffer for the entire batch, with each vector component using
    + * a slice of the overall buffer. &ndash; case for batches deserialized 
from
    + * exchanges.</li>
    + * <li>One buffer for each top-level vector, with component vectors
    + * using slices of the overall vector buffer &ndash; the result of reading
    + * spilled batches from disk.</li>
    + * </ul>
    + * In each case, buffer sizes are power-of-two rounded from the data size.
    + * But since the data is grouped differently in each case, the resulting 
buffer
    + * sizes vary considerably.
    + * <p>
    + * As a result, we can never be sure of the amount of memory needed for a
    + * batch. So, we have to estimate based on a number of factors:
    + * <ul>
    + * <li>Uses the {@link RecordBatchSizer} to estimate the data size and
    + * buffer size of each incoming batch.</li>
    + * <li>Estimates the internal fragmentation due to power-of-two 
rounding.</li>
    + * <li>Configured preferences for spill and output batches.</li>
    + * </ul>
    + * The code handles "normal" and "low" memory conditions.
    + * <ul>
    + * <li>In normal memory, we simply work out the number of preferred-size
    + * batches fit in memory (based on the predicted buffer size.)</li>
    + * <li>In low memory, we divide up the available memory to produce the
    + * spill and merge batch sizes. The sizes will be less than the configured
    + * preference.</li>
    + * </ul>
    + */
    +
     public class SortMemoryManager {
    +  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +
    +  /**
    +   * Estimate for typical internal fragmentation in a buffer due to 
power-of-two
    +   * rounding on vectors.
    +   * <p>
    +   * <p>
    +   * <pre>[____|__$__]</pre>
    +   * In the above, the brackets represent the whole vector. The
    +   * first half is always full. When the first half filled, the second
    +   * half was allocated. On average, the second half will be half full.
    +   * This means that, on average, 1/4 of the allocated space is
    +   * unused (the definition of internal fragmentation.)
    +   */
    +
    +  public static final double INTERNAL_FRAGMENTATION_ESTIMATE = 1.0/4.0;
    +
    +  /**
    +   * Given a buffer, this is the assumed amount of space
    +   * available for data. (Adding more will double the buffer
    +   * size half the time.)
    +   */
    +
    +  public static final double PAYLOAD_MULTIPLIER = 1 - 
INTERNAL_FRAGMENTATION_ESTIMATE;
    +
    +  /**
    +   * Given a data size, this is the multiplier to create the buffer
    +   * size estimate. (Note: since we work with aggregate batches, we
    +   * cannot simply round up to the next power of two: rounding is done
    +   * on a vector-by-vector basis. Here we need to estimate the aggregate
    +   * effect of rounding.
    +   */
    +
    +  public static final double BUFFER_MULTIPLIER = 1/PAYLOAD_MULTIPLIER;
    +  public static final double WORST_CASE_MULTIPLIER = 2.0;
    +
    +  public static final int MIN_ROWS_PER_SORT_BATCH = 100;
    --- End diff --
    
    Possibly make the min rows a configurable option, so could be tested with 
different values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to