[ 
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850646#comment-15850646
 ] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99037775
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
 ---
    @@ -0,0 +1,334 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.Iterator;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.cache.VectorAccessibleSerializable;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.TransferPair;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +import com.google.common.base.Stopwatch;
    +
    +/**
    + * Represents a group of batches spilled to disk.
    + * <p>
    + * The batches are defined by a schema which can change over time. When 
the schema changes,
    + * all existing and new batches are coerced into the new schema. Provides a
    + * uniform way to iterate over records for one or more batches whether
    + * the batches are in memory or on disk.
    + * <p>
    + * The <code>BatchGroup</code> operates in two modes as given by the two
    + * subclasses:
    + * <ul>
    + * <li>Input mode (@link InputBatchGroup): Used to buffer in-memory batches
    + * prior to spilling.</li>
    + * <li>Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
    + * of batches written to disk. Acts as both a reader and writer for
    + * those batches.</li>
    + */
    +
    +public abstract class BatchGroup implements VectorAccessible, 
AutoCloseable {
    +  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
    +
    +  /**
    +   * The input batch group gathers batches buffered in memory before
    +   * spilling. The structure of the data is:
    +   * <ul>
    +   * <li>Contains a single batch received from the upstream (input)
    +   * operator.</li>
    +   * <li>Associated selection vector that provides a sorted
    +   * indirection to the values in the batch.</li>
    +   * </ul>
    +   */
    +
    +  public static class InputBatch extends BatchGroup {
    +    private SelectionVector2 sv2;
    +
    +    public InputBatch(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context, long batchSize) {
    +      super(container, context, batchSize);
    +      this.sv2 = sv2;
    +    }
    +
    +    public SelectionVector2 getSv2() {
    +      return sv2;
    +    }
    +
    +    @Override
    +    public int getRecordCount() {
    +      if (sv2 != null) {
    +        return sv2.getCount();
    +      } else {
    +        return super.getRecordCount();
    +      }
    +    }
    +
    +    @Override
    +    public int getNextIndex() {
    +      int val = super.getNextIndex();
    +      if (val == -1) {
    +        return val;
    +      }
    +      return sv2.getIndex(val);
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +      try {
    +        super.close();
    +      }
    +      finally {
    +        if (sv2 != null) {
    +          sv2.clear();
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Holds a set of spilled batches, represented by a file on disk.
    +   * Handles reads from, and writes to the spill file. The data structure
    +   * is:
    +   * <ul>
    +   * <li>A pointer to a file that contains serialized batches.</li>
    +   * <li>When writing, each batch is appended to the output file.</li>
    +   * <li>When reading, iterates over each spilled batch, and for each
    +   * of those, each spilled record.</li>
    +   * </ul>
    +   * <p>
    +   * Starts out with no current batch. Defines the current batch to be the
    +   * (shell: schema without data) of the last batch spilled to disk.
    +   * <p>
    +   * When reading, has destructive read-once behavior: closing the
    +   * batch (after reading) deletes the underlying spill file.
    +   * <p>
    +   * This single class does three tasks: load data, hold data and
    +   * read data. This should be split into three separate classes. But,
    +   * the original (combined) structure is retained for expedience at
    +   * present.
    +   */
    +
    +  public static class SpilledRun extends BatchGroup {
    +    private InputStream inputStream;
    +    private OutputStream outputStream;
    +    private String path;
    +    private SpillSet spillSet;
    +    private BufferAllocator allocator;
    +    private int spilledBatches = 0;
    +
    +    public SpilledRun(SpillSet spillSet, String path, OperatorContext 
context, long batchSize) throws IOException {
    +      super(null, context, batchSize);
    +      this.spillSet = spillSet;
    +      this.path = path;
    +      this.allocator = context.getAllocator();
    +      outputStream = spillSet.openForOutput(path);
    +    }
    +
    +    public void addBatch(VectorContainer newContainer) throws IOException {
    +      int recordCount = newContainer.getRecordCount();
    +      @SuppressWarnings("resource")
    +      WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, 
newContainer, false);
    +      VectorAccessibleSerializable outputBatch = new 
VectorAccessibleSerializable(batch, allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      outputBatch.writeToStream(outputStream);
    +      newContainer.zeroVectors();
    +      logger.trace("Wrote {} records in {} us", recordCount, 
watch.elapsed(TimeUnit.MICROSECONDS));
    +      spilledBatches++;
    +
    +      // Hold onto the husk of the last added container so that we have a
    +      // current container when starting to read rows back later.
    +
    +      currentContainer = newContainer;
    +      currentContainer.setRecordCount(0);
    +    }
    +
    +    @Override
    +    public int getNextIndex() {
    +      if (pointer == getRecordCount()) {
    +        if (spilledBatches == 0) {
    +          return -1;
    +        }
    +        try {
    +          currentContainer.zeroVectors();
    +          getBatch();
    +        } catch (IOException e) {
    +          // Release any partially-loaded data.
    +          currentContainer.clear();
    +          throw UserException.dataReadError(e)
    +              .message("Failure while reading spilled data")
    +              .build(logger);
    +        }
    +        pointer = 1;
    --- End diff --
    
    This is original code. The logic seems to be that the method returns 0 (the 
next index) and advances the pointer to the position after that (1). See the 
code in the base class method for how this works elsewhere.


> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10.0
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that 
> works to a clearly-defined memory limit. Attached is a design specification 
> for the work.
> The project will include fixing a number of bugs related to the external 
> sort, include as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to