DRILL-5275: Sort spill is slow due to repeated allocations Rather than create a heap buffer per vector when writing and reading, the revised code creates a single, shared buffer used for all I/O within a particular container. This improves performance by reducing GC and CPU costs during I/Os.
Move I/O buffer, and methods to allocator Allows the buffer to be shared. Especially in the sort, this is important, as the sort may have many serializations open at once. closes #754 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6bc398fc Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6bc398fc Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6bc398fc Branch: refs/heads/master Commit: 6bc398fc55bcbf73f1f882de67f146927d1d09d0 Parents: 38f816a Author: Paul Rogers <prog...@maprtech.com> Authored: Sun Feb 19 17:53:31 2017 -0800 Committer: Sudheesh Katkam <sudhe...@apache.org> Committed: Fri Feb 24 18:41:48 2017 -0800 ---------------------------------------------------------------------- .../cache/VectorAccessibleSerializable.java | 9 ++--- .../apache/drill/exec/memory/BaseAllocator.java | 37 ++++++++++++++++++++ .../drill/exec/memory/BufferAllocator.java | 29 +++++++++++++++ 3 files changed, 71 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index e3bf5bd..89876af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -90,6 +90,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { * @param input the InputStream to read from * @throws IOException */ + @SuppressWarnings("resource") @Override public void readFromStream(InputStream input) throws IOException { final VectorContainer container = new VectorContainer(); @@ -112,7 +113,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { final DrillBuf buf = allocator.buffer(dataLength); final ValueVector vector; try { - buf.writeBytes(input, dataLength); + allocator.read(buf, input, dataLength); vector = TypeHelper.getNewVector(field, allocator); vector.load(metaData, buf); } finally { @@ -136,6 +137,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { * @param output the OutputStream to write to * @throws IOException */ + @SuppressWarnings("resource") @Override public void writeToStream(OutputStream output) throws IOException { Preconditions.checkNotNull(output); @@ -159,7 +161,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { /* If we have a selection vector, dump it to file first */ if (svBuf != null) { - svBuf.getBytes(0, output, svBuf.readableBytes()); + allocator.write(svBuf, output); sv2.setBuffer(svBuf); svBuf.release(); // sv2 now owns the buffer sv2.setRecordCount(svCount); @@ -168,8 +170,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { /* Dump the array of ByteBuf's associated with the value vectors */ for (DrillBuf buf : incomingBuffers) { /* dump the buffer into the OutputStream */ - int bufLength = buf.readableBytes(); - buf.getBytes(0, output, bufLength); + allocator.write(buf, output); } output.flush(); http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java index 1245e86..ba47998 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java @@ -21,6 +21,9 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.DrillBuf; import io.netty.buffer.UnsafeDirectLittleEndian; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.IdentityHashMap; import java.util.Set; @@ -789,4 +792,38 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato public static boolean isDebug() { return DEBUG; } + + /** + * Disk I/O buffer used for all reads and writes of DrillBufs. + */ + + private byte ioBuffer[]; + + public byte[] getIOBuffer() { + if (ioBuffer == null) { + ioBuffer = new byte[32*1024]; + } + return ioBuffer; + } + + public void read(DrillBuf buf, InputStream in, int length) throws IOException { + buf.clear(); + + byte[] buffer = getIOBuffer(); + for (int posn = 0; posn < length; posn += buffer.length) { + int len = Math.min(buffer.length, length - posn); + in.read(buffer, 0, len); + buf.writeBytes(buffer, 0, len); + } + } + + public void write(DrillBuf buf, OutputStream out) throws IOException { + byte[] buffer = getIOBuffer(); + int bufLength = buf.readableBytes(); + for (int posn = 0; posn < bufLength; posn += buffer.length) { + int len = Math.min(buffer.length, bufLength - posn); + buf.getBytes(posn, buffer, 0, len); + out.write(buffer, 0, len); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/6bc398fc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java index 64f7d86..3c5f57d 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java @@ -20,6 +20,10 @@ package org.apache.drill.exec.memory; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.DrillBuf; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.BufferManager; @@ -151,4 +155,29 @@ public interface BufferAllocator extends AutoCloseable { * a no-op. */ public void assertOpen(); + + /** + * Write the contents of a DrillBuf to a stream. Use this method, rather + * than calling the DrillBuf.getBytes() method, because this method + * avoids repeated heap allocation for the intermediate heap buffer. + * + * @param buf the Drillbuf to write + * @param output the output stream + * @throws IOException if a write error occurs + */ + + public void write(DrillBuf buf, OutputStream out) throws IOException; + + /** + * Read the contents of a DrillBuf from a stream. Use this method, rather + * than calling the DrillBuf.writeBytes() method, because this method + * avoids repeated heap allocation for the intermediate heap buffer. + * + * @param buf the buffer to read with space already allocated + * @param input input stream from which to read data + * @param bufLength number of bytes to read + * @throws IOException if a read error occurs + */ + + public void read(DrillBuf buf, InputStream in, int length) throws IOException; }