This is an automated email from the ASF dual-hosted git repository. omalley pushed a commit to branch storage-branch-2.8 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/storage-branch-2.8 by this push: new b304acd HIVE-25190: Fix many small allocations in BytesColumnVector b304acd is described below commit b304acd36334e54d276e9ded5851ae24d2f23595 Author: Owen O'Malley <oomal...@linkedin.com> AuthorDate: Fri Jun 18 16:30:13 2021 -0700 HIVE-25190: Fix many small allocations in BytesColumnVector Fixes #2408 Signed-off-by: Owen O'Malley <oomal...@linkedin.com> --- storage-api/pom.xml | 2 +- .../hive/ql/exec/vector/BytesColumnVector.java | 163 ++++++++++----------- .../hive/ql/exec/vector/TestBytesColumnVector.java | 124 ++++++++++++++-- 3 files changed, 187 insertions(+), 102 deletions(-) diff --git a/storage-api/pom.xml b/storage-api/pom.xml index 53fa3c0..c87aed7 100644 --- a/storage-api/pom.xml +++ b/storage-api/pom.xml @@ -185,7 +185,7 @@ <version>3.0.0-M4</version> <configuration> <reuseForks>false</reuseForks> - <argLine>-Xmx2048m</argLine> + <argLine>-Xmx3g</argLine> <failIfNoTests>false</failIfNoTests> <systemPropertyVariables> <test.tmp.dir>${project.build.directory}/tmp</test.tmp.dir> diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 6618807..a8c58ac 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -49,14 +49,15 @@ public class BytesColumnVector extends ColumnVector { */ public int[] length; - // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to - // a byte[] with sufficient space for the specified size. - private byte[] buffer; // optional buffer to use when actually copying in data - private int nextFree; // next free position in buffer + // Calls to ensureValPreallocated() ensure that currentValue and currentOffset + // are set to enough space for the value. + private byte[] currentValue; // bytes for the next value + private int currentOffset; // starting position in the current buffer - // Hang onto a byte array for holding smaller byte values - private byte[] smallBuffer; - private int smallBufferNextFree; + // A shared static buffer allocation that we use for the small values + private byte[] sharedBuffer; + // The next unused offset in the sharedBuffer. + private int sharedBufferOffset; private int bufferAllocationCount; @@ -66,8 +67,11 @@ public class BytesColumnVector extends ColumnVector { // Proportion of extra space to provide when allocating more buffer space. static final float EXTRA_SPACE_FACTOR = (float) 1.2; - // Largest size allowed in smallBuffer - static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024; + // Largest item size allowed in sharedBuffer + static final int MAX_SIZE_FOR_SMALL_ITEM = 1024 * 1024; + + // Largest size allowed for sharedBuffer + static final int MAX_SIZE_FOR_SHARED_BUFFER = 1024 * 1024 * 1024; /** * Use this constructor for normal operation. @@ -121,30 +125,30 @@ public class BytesColumnVector extends ColumnVector { * Provide the estimated number of bytes needed to hold * a full column vector worth of byte string data. * - * @param estimatedValueSize Estimated size of buffer space needed + * @param estimatedValueSize Estimated size of buffer space needed per row */ public void initBuffer(int estimatedValueSize) { - nextFree = 0; - smallBufferNextFree = 0; + sharedBufferOffset = 0; // if buffer is already allocated, keep using it, don't re-allocate - if (buffer != null) { + if (sharedBuffer != null) { // Free up any previously allocated buffers that are referenced by vector if (bufferAllocationCount > 0) { for (int idx = 0; idx < vector.length; ++idx) { vector[idx] = null; length[idx] = 0; } - buffer = smallBuffer; // In case last row was a large bytes value } } else { // allocate a little extra space to limit need to re-allocate - int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR); + long bufferSize = (long) (this.vector.length * estimatedValueSize * EXTRA_SPACE_FACTOR); if (bufferSize < DEFAULT_BUFFER_SIZE) { bufferSize = DEFAULT_BUFFER_SIZE; } - buffer = new byte[bufferSize]; - smallBuffer = buffer; + if (bufferSize > MAX_SIZE_FOR_SHARED_BUFFER) { + bufferSize = MAX_SIZE_FOR_SHARED_BUFFER; + } + sharedBuffer = new byte[(int) bufferSize]; } bufferAllocationCount = 0; } @@ -160,10 +164,7 @@ public class BytesColumnVector extends ColumnVector { * @return amount of buffer space currently allocated */ public int bufferSize() { - if (buffer == null) { - return 0; - } - return buffer.length; + return sharedBuffer == null ? 0 : sharedBuffer.length; } /** @@ -182,16 +183,13 @@ public class BytesColumnVector extends ColumnVector { * @param length length of source byte sequence */ public void setVal(int elementNum, byte[] sourceBuf, int start, int length) { - if ((nextFree + length) > buffer.length) { - increaseBufferSpace(length); - } + ensureValPreallocated(length); if (length > 0) { - System.arraycopy(sourceBuf, start, buffer, nextFree, length); + System.arraycopy(sourceBuf, start, currentValue, currentOffset, length); } - vector[elementNum] = buffer; - this.start[elementNum] = nextFree; + vector[elementNum] = currentValue; + this.start[elementNum] = currentOffset; this.length[elementNum] = length; - nextFree += length; } /** @@ -212,23 +210,31 @@ public class BytesColumnVector extends ColumnVector { } /** - * Preallocate space in the local buffer so the caller can fill in the value bytes themselves. + * Ensures that we have space allocated for the next value, which has size + * length bytes. + * + * Updates currentValue, currentOffset, and sharedBufferOffset for this value. * - * Always use with getValPreallocatedBytes, getValPreallocatedStart, and setValPreallocated. + * Always use before getValPreallocatedBytes, getValPreallocatedStart, + * and setValPreallocated. */ public void ensureValPreallocated(int length) { - if ((nextFree + length) > buffer.length) { - increaseBufferSpace(length); + if ((sharedBufferOffset + length) > sharedBuffer.length) { + currentValue = allocateBuffer(length); + } else { + currentValue = sharedBuffer; + currentOffset = sharedBufferOffset; + sharedBufferOffset += length; } } @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Expose internal rep for efficiency") public byte[] getValPreallocatedBytes() { - return buffer; + return currentValue; } public int getValPreallocatedStart() { - return nextFree; + return currentOffset; } /** @@ -237,10 +243,9 @@ public class BytesColumnVector extends ColumnVector { * @param length */ public void setValPreallocated(int elementNum, int length) { - vector[elementNum] = buffer; - this.start[elementNum] = nextFree; + vector[elementNum] = currentValue; + this.start[elementNum] = currentOffset; this.length[elementNum] = length; - nextFree += length; } /** @@ -258,73 +263,55 @@ public class BytesColumnVector extends ColumnVector { public void setConcat(int elementNum, byte[] leftSourceBuf, int leftStart, int leftLen, byte[] rightSourceBuf, int rightStart, int rightLen) { int newLen = leftLen + rightLen; - if ((nextFree + newLen) > buffer.length) { - increaseBufferSpace(newLen); - } - vector[elementNum] = buffer; - this.start[elementNum] = nextFree; + ensureValPreallocated(newLen); + vector[elementNum] = currentValue; + this.start[elementNum] = currentOffset; this.length[elementNum] = newLen; - System.arraycopy(leftSourceBuf, leftStart, buffer, nextFree, leftLen); - nextFree += leftLen; - System.arraycopy(rightSourceBuf, rightStart, buffer, nextFree, rightLen); - nextFree += rightLen; + System.arraycopy(leftSourceBuf, leftStart, currentValue, currentOffset, leftLen); + System.arraycopy(rightSourceBuf, rightStart, currentValue, + currentOffset + leftLen, rightLen); } /** - * Increase buffer space enough to accommodate next element. + * Allocate/reuse enough buffer space to accommodate next element. + * currentOffset is set to the first available byte in the returned array. + * If sharedBuffer is used, sharedBufferOffset is updated to point after the + * current record. + * * This uses an exponential increase mechanism to rapidly * increase buffer size to enough to hold all data. * As batches get re-loaded, buffer space allocated will quickly * stabilize. * * @param nextElemLength size of next element to be added + * @return the buffer to use for the next element */ - public void increaseBufferSpace(int nextElemLength) { - // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to - // a byte[] with sufficient space for the specified size. - // This will either point to smallBuffer, or to a newly allocated byte array for larger values. - - if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) { - // Larger allocations will be special-cased and will not use the normal buffer. - // buffer/nextFree will be set to a newly allocated array just for the current row. - // The next row will require another call to increaseBufferSpace() since this new buffer should be used up. - byte[] newBuffer = new byte[nextElemLength]; + private byte[] allocateBuffer(int nextElemLength) { + // If this is a large value or shared buffer is maxed out, allocate a + // single use buffer. Assumes that sharedBuffer length and + // MAX_SIZE_FOR_SHARED_BUFFER are powers of 2. + if (nextElemLength > MAX_SIZE_FOR_SMALL_ITEM || + sharedBufferOffset + nextElemLength >= MAX_SIZE_FOR_SHARED_BUFFER) { + // allocate a value for the next value ++bufferAllocationCount; - // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state - // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree - // so we don't lose this. A bit of a weird dance here. - if (smallBuffer == buffer) { - smallBufferNextFree = nextFree; - } - buffer = newBuffer; - nextFree = 0; + currentOffset = 0; + return new byte[nextElemLength]; } else { - // This value should go into smallBuffer. - if (smallBuffer != buffer) { - // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER). - // Use smallBuffer if possible. - buffer = smallBuffer; - nextFree = smallBufferNextFree; - } - // smallBuffer might still be out of space - if ((nextFree + nextElemLength) > buffer.length) { - int newLength = smallBuffer.length * 2; + // sharedBuffer might still be out of space + if ((sharedBufferOffset + nextElemLength) > sharedBuffer.length) { + int newLength = sharedBuffer.length * 2; while (newLength < nextElemLength) { - if (newLength > 0) { - newLength *= 2; - } else { // integer overflow happened; maximize size of next smallBuffer - newLength = Integer.MAX_VALUE; - } + newLength *= 2; } - smallBuffer = new byte[newLength]; + sharedBuffer = new byte[newLength]; ++bufferAllocationCount; - smallBufferNextFree = 0; - // Update buffer - buffer = smallBuffer; - nextFree = 0; + sharedBufferOffset = 0; } + currentOffset = sharedBufferOffset; + sharedBufferOffset += nextElemLength; + return sharedBuffer; } } @@ -575,10 +562,12 @@ public class BytesColumnVector extends ColumnVector { public void shallowCopyTo(ColumnVector otherCv) { BytesColumnVector other = (BytesColumnVector)otherCv; super.shallowCopyTo(other); - other.nextFree = nextFree; + other.currentOffset = currentOffset; other.vector = vector; other.start = start; other.length = length; - other.buffer = buffer; + other.currentValue = currentValue; + other.sharedBuffer = sharedBuffer; + other.sharedBufferOffset = sharedBufferOffset; } } diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java index da89122..3b42d17 100644 --- a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java +++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java @@ -18,10 +18,16 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.junit.Test; -import static org.junit.Assert.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; public class TestBytesColumnVector { @Test @@ -35,33 +41,41 @@ public class TestBytesColumnVector { col.reset(); // Initial write (small value) - byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 1); bytesWrittenToBytes1 += smallWriteSize; + assertEquals(0, col.start[0]); + assertEquals(smallWriteSize, col.length[0]); // Write a large value. This should use a different byte buffer rowIdx++; - byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2); - assertFalse(bytes1 == bytes2); + byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -1); + assertNotSame(bytes1, bytes2); + assertEquals(0, col.start[1]); + assertEquals(largeWriteSize, col.length[1]); // Another small write. smallBuffer should be re-used for this write rowIdx++; - byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 2); bytesWrittenToBytes1 += smallWriteSize; - assertTrue(bytes1 == bytes3); + assertSame(bytes1, bytes3); + assertEquals(smallWriteSize, col.start[2]); + assertEquals(smallWriteSize, col.length[2]); // Write another large value. This should use a different byte buffer rowIdx++; - byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3); - assertFalse(bytes1 == bytes4); - assertFalse(bytes2 == bytes4); + byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -2); + assertNotSame(bytes1, bytes4); + assertNotSame(bytes2, bytes4); + assertEquals(0, col.start[3]); + assertEquals(largeWriteSize, col.length[3]); // Eventually enough small writes should result in another buffer getting created boolean gotNewBuffer = false; // Test is dependent on getting a new buffer within 1MB. // This may need to change as the implementation changes. - for (int i = 0; i < 1024; ++i) { + for (int i = 3; i < 1024; ++i) { rowIdx++; - byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, i); if (currBytes == bytes1) { bytesWrittenToBytes1 += smallWriteSize; } else { @@ -74,16 +88,98 @@ public class TestBytesColumnVector { // All small writes to the first buffer should be in contiguous memory for (int i = 0; i < bytesWrittenToBytes1; ++i) { - assertEquals((byte) 1, bytes1[i]); + assertEquals((byte) (i / smallWriteSize + 1), bytes1[i]); + } + } + + /** + * Test the setVal, setConcat, and StringExpr.padRight methods. + */ + @Test + public void testConcatAndPadding() { + BytesColumnVector col = new BytesColumnVector(); + col.reset(); + byte[] prefix = "緑".getBytes(StandardCharsets.UTF_8); + + // fill the column with 'test' + for(int row=0; row < col.vector.length; ++row) { + col.setVal(row, prefix, 0, prefix.length); + } + for(int row=0; row < col.vector.length; ++row) { + assertEquals("row " + row, "緑", col.toString(row)); + } + + // pad out to 6 characters + for(int row=0; row < col.vector.length; ++row) { + StringExpr.padRight(col, row, col.vector[row], col.start[row], + col.length[row], 6); + } + for(int row=0; row < col.vector.length; ++row) { + assertEquals("row " + row, "緑 ", col.toString(row)); + } + + // concat the row digits + for(int row=0; row < col.vector.length; ++row) { + byte[] rowStr = Integer.toString(row).getBytes(StandardCharsets.UTF_8); + col.setConcat(row, col.vector[row], col.start[row], col.length[row], + rowStr, 0, rowStr.length); + } + for(int row=0; row < col.vector.length; ++row) { + assertEquals("row " + row, "緑 " + row, col.toString(row)); + } + + // We end up allocating 20k, so we should have expanded the small buffer + assertEquals(32 * 1024, col.bufferSize()); + } + + @Test + public void testBufferOverflow() { + BytesColumnVector col = new BytesColumnVector(2048); + col.reset(); + assertEquals(BytesColumnVector.DEFAULT_BUFFER_SIZE, col.bufferSize()); + + // pick a size below 1m so that we use the small buffer; + final int size = BytesColumnVector.MAX_SIZE_FOR_SMALL_ITEM - 1024; + + // run through once to expand the small value buffer + for(int row=0; row < col.vector.length; ++row) { + writeToBytesColumnVector(row, col, size, row); + } + // it should have resized a bunch of times + byte[] sharedBuffer = col.getValPreallocatedBytes(); + assertNotSame(sharedBuffer, col.vector[0]); + assertSame(sharedBuffer, col.vector[1024]); + + // reset the column, but make sure the buffer isn't reallocated + col.reset(); + assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize()); + + // fill up the vector now with the large buffer + for(int row=0; row < col.vector.length; ++row) { + writeToBytesColumnVector(row, col, size, row); + } + assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize()); + // Now the first 1025 rows should all be the shared buffer, + // because 1025 * size < MAX_SIZE_FOR_SMALL_BUFFER + for(int row=0; row < 1025; ++row) { + assertSame("row " + row, sharedBuffer, col.vector[row]); + assertEquals("row " + row, row * size, col.start[row]); + assertEquals("row " + row, size, col.length[row]); + } + // the rest should be custom buffers + for(int row=1025; row < col.vector.length; ++row) { + assertNotSame("row " + row, sharedBuffer, col.vector[row]); + assertEquals("row " + row, 0, col.start[row]); + assertEquals("row " + row, size, col.length[row]); } } // Write a value to the column vector, and return back the byte buffer used. - private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) { + private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, int val) { col.ensureValPreallocated(writeSize); byte[] bytes = col.getValPreallocatedBytes(); int startIdx = col.getValPreallocatedStart(); - Arrays.fill(bytes, startIdx, startIdx + writeSize, val); + Arrays.fill(bytes, startIdx, startIdx + writeSize, (byte) val); col.setValPreallocated(rowIdx, writeSize); return bytes; }