Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r98582657
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
 ---
    @@ -0,0 +1,334 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.Iterator;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.cache.VectorAccessibleSerializable;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.TransferPair;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +
    +import com.google.common.base.Stopwatch;
    +
    +/**
    + * Represents a group of batches spilled to disk.
    + * <p>
    + * The batches are defined by a schema which can change over time. When 
the schema changes,
    + * all existing and new batches are coerced into the new schema. Provides a
    + * uniform way to iterate over records for one or more batches whether
    + * the batches are in memory or on disk.
    + * <p>
    + * The <code>BatchGroup</code> operates in two modes as given by the two
    + * subclasses:
    + * <ul>
    + * <li>Input mode (@link InputBatchGroup): Used to buffer in-memory batches
    + * prior to spilling.</li>
    + * <li>Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
    + * of batches written to disk. Acts as both a reader and writer for
    + * those batches.</li>
    + */
    +
    +public abstract class BatchGroup implements VectorAccessible, 
AutoCloseable {
    +  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
    +
    +  /**
    +   * The input batch group gathers batches buffered in memory before
    +   * spilling. The structure of the data is:
    +   * <ul>
    +   * <li>Contains a single batch received from the upstream (input)
    +   * operator.</li>
    +   * <li>Associated selection vector that provides a sorted
    +   * indirection to the values in the batch.</li>
    +   * </ul>
    +   */
    +
    +  public static class InputBatch extends BatchGroup {
    +    private SelectionVector2 sv2;
    +
    +    public InputBatch(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context, long batchSize) {
    +      super(container, context, batchSize);
    +      this.sv2 = sv2;
    +    }
    +
    +    public SelectionVector2 getSv2() {
    +      return sv2;
    +    }
    +
    +    @Override
    +    public int getRecordCount() {
    +      if (sv2 != null) {
    +        return sv2.getCount();
    +      } else {
    +        return super.getRecordCount();
    +      }
    +    }
    +
    +    @Override
    +    public int getNextIndex() {
    +      int val = super.getNextIndex();
    +      if (val == -1) {
    +        return val;
    +      }
    +      return sv2.getIndex(val);
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +      try {
    +        super.close();
    +      }
    +      finally {
    +        if (sv2 != null) {
    +          sv2.clear();
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Holds a set of spilled batches, represented by a file on disk.
    +   * Handles reads from, and writes to the spill file. The data structure
    +   * is:
    +   * <ul>
    +   * <li>A pointer to a file that contains serialized batches.</li>
    +   * <li>When writing, each batch is appended to the output file.</li>
    +   * <li>When reading, iterates over each spilled batch, and for each
    +   * of those, each spilled record.</li>
    +   * </ul>
    +   * <p>
    +   * Starts out with no current batch. Defines the current batch to be the
    +   * (shell: schema without data) of the last batch spilled to disk.
    +   * <p>
    +   * When reading, has destructive read-once behavior: closing the
    +   * batch (after reading) deletes the underlying spill file.
    +   * <p>
    +   * This single class does three tasks: load data, hold data and
    +   * read data. This should be split into three separate classes. But,
    +   * the original (combined) structure is retained for expedience at
    +   * present.
    +   */
    +
    +  public static class SpilledRun extends BatchGroup {
    +    private InputStream inputStream;
    +    private OutputStream outputStream;
    +    private String path;
    +    private SpillSet spillSet;
    +    private BufferAllocator allocator;
    +    private int spilledBatches = 0;
    +
    +    public SpilledRun(SpillSet spillSet, String path, OperatorContext 
context, long batchSize) throws IOException {
    +      super(null, context, batchSize);
    +      this.spillSet = spillSet;
    +      this.path = path;
    +      this.allocator = context.getAllocator();
    +      outputStream = spillSet.openForOutput(path);
    +    }
    +
    +    public void addBatch(VectorContainer newContainer) throws IOException {
    +      int recordCount = newContainer.getRecordCount();
    +      @SuppressWarnings("resource")
    +      WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, 
newContainer, false);
    +      VectorAccessibleSerializable outputBatch = new 
VectorAccessibleSerializable(batch, allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      outputBatch.writeToStream(outputStream);
    +      newContainer.zeroVectors();
    +      logger.trace("Wrote {} records in {} us", recordCount, 
watch.elapsed(TimeUnit.MICROSECONDS));
    +      spilledBatches++;
    +
    +      // Hold onto the husk of the last added container so that we have a
    +      // current container when starting to read rows back later.
    +
    +      currentContainer = newContainer;
    +      currentContainer.setRecordCount(0);
    +    }
    +
    +    @Override
    +    public int getNextIndex() {
    +      if (pointer == getRecordCount()) {
    +        if (spilledBatches == 0) {
    +          return -1;
    +        }
    +        try {
    +          currentContainer.zeroVectors();
    +          getBatch();
    +        } catch (IOException e) {
    +          // Release any partially-loaded data.
    +          currentContainer.clear();
    +          throw UserException.dataReadError(e)
    +              .message("Failure while reading spilled data")
    +              .build(logger);
    +        }
    +        pointer = 1;
    +        return 0;
    +      }
    +      return super.getNextIndex();
    +    }
    +
    +    private VectorContainer getBatch() throws IOException {
    +      if (inputStream == null) {
    +        inputStream = spillSet.openForInput(path);
    +      }
    +      VectorAccessibleSerializable vas = new 
VectorAccessibleSerializable(allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      vas.readFromStream(inputStream);
    +      VectorContainer c =  vas.get();
    +      if (schema != null) {
    +        c = SchemaUtil.coerceContainer(c, schema, context);
    +      }
    +      logger.trace("Read {} records in {} us", c.getRecordCount(), 
watch.elapsed(TimeUnit.MICROSECONDS));
    +      spilledBatches--;
    +      currentContainer.zeroVectors();
    +      Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
    +      for (@SuppressWarnings("rawtypes") VectorWrapper w : 
currentContainer) {
    +        TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
    +        pair.transfer();
    +      }
    +      currentContainer.setRecordCount(c.getRecordCount());
    +      c.zeroVectors();
    +      return c;
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +      try {
    +        super.close();
    +      }
    +      finally {
    +        try {
    +          closeOutputStream();
    +        } finally {
    +          try {
    +            if (inputStream != null) {
    +              inputStream.close();
    +              inputStream = null;
    +              logger.trace("Summary: Read {} bytes from {}", dataSize, 
path);
    +            }
    +          }
    +          finally {
    +            spillSet.delete(path);
    +          }
    +        }
    +      }
    +    }
    +
    +    public void closeOutputStream() throws IOException {
    --- End diff --
    
    A minor comment: Can make this method generic - take a stream (either 
inputStream or outputStream) and a string for the logger ("read" or "Wrote"), 
and then use it also to replace lines 232-236 above.
    (No need to reset the stream to null; we're closing anyway).
    And as mentioned, the logger.trace() line should come before the close().



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to