Repository: arrow Updated Branches: refs/heads/master a5362c2cb -> d06c49144
ARROW-399: ListVector.loadFieldBuffers ignores the ArrowFieldNode len⦠â¦gth metadata Author: Julien Le Dem <jul...@dremio.com> Closes #227 from julienledem/arrow_399 and squashes the following commits: 93a77cb [Julien Le Dem] set padding; add test 462a36c [Julien Le Dem] ARROW-399: ListVector.loadFieldBuffers ignores the ArrowFieldNode length metadata Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d06c4914 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d06c4914 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d06c4914 Branch: refs/heads/master Commit: d06c49144a60faa9af115e803694329e82623a5d Parents: a5362c2 Author: Julien Le Dem <jul...@dremio.com> Authored: Fri Dec 9 11:41:21 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Dec 9 11:41:21 2016 -0500 ---------------------------------------------------------------------- .../codegen/templates/FixedValueVectors.java | 2 + .../codegen/templates/NullableValueVectors.java | 62 +++++-------- .../src/main/codegen/templates/UnionVector.java | 2 + .../arrow/vector/BaseDataValueVector.java | 17 ++++ .../java/org/apache/arrow/vector/BitVector.java | 2 +- .../org/apache/arrow/vector/VectorLoader.java | 2 +- .../apache/arrow/vector/complex/ListVector.java | 2 + .../arrow/vector/TestVectorUnloadLoad.java | 92 ++++++++++++++++++-- 8 files changed, 136 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/FixedValueVectors.java b/java/vector/src/main/codegen/templates/FixedValueVectors.java index 7958222..be385d1 100644 --- a/java/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/java/vector/src/main/codegen/templates/FixedValueVectors.java @@ -45,6 +45,8 @@ package org.apache.arrow.vector; public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class); + public static final int TYPE_WIDTH = ${type.width}; + private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index 2c4274c..6a9ce65 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -37,7 +37,7 @@ import java.util.Collections; import org.apache.arrow.flatbuf.Precision; /** - * Nullable${minor.class} implements a vector of values which could be null. Elements in the vector + * ${className} implements a vector of values which could be null. Elements in the vector * are first checked against a fixed length vector of boolean values. Then the element is retrieved * from the base class (if not null). * @@ -47,7 +47,7 @@ import org.apache.arrow.flatbuf.Precision; public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class); - private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this); + private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this); private final String bitsField = "$bits$"; private final String valuesField = "$values$"; @@ -67,7 +67,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type public ${className}(String name, BufferAllocator allocator, int precision, int scale) { super(name, allocator); - values = new ${minor.class}Vector(valuesField, allocator, precision, scale); + values = new ${valuesName}(valuesField, allocator, precision, scale); this.precision = precision; this.scale = scale; mutator = new Mutator(); @@ -81,7 +81,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type <#else> public ${className}(String name, BufferAllocator allocator) { super(name, allocator); - values = new ${minor.class}Vector(valuesField, allocator); + values = new ${valuesName}(valuesField, allocator); mutator = new Mutator(); accessor = new Accessor(); <#if minor.class == "TinyInt" || @@ -144,6 +144,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + <#if type.major = "VarLen"> + // variable width values: truncate offset vector buffer to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.offsetVector.getBufferSizeFor(fieldNode.getLength() + 1)); + <#else> + // fixed width values truncate value vector to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.getBufferSizeFor(fieldNode.getLength())); + </#if> org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); bits.valueCount = fieldNode.getLength(); } @@ -229,13 +236,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type values.setInitialCapacity(numRecords); } -// @Override -// public SerializedField.Builder getMetadataBuilder() { -// return super.getMetadataBuilder() -// .addChild(bits.getMetadata()) -// .addChild(values.getMetadata()); -// } - @Override public void allocateNew() { if(!allocateNewSafe()){ @@ -329,20 +329,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type } </#if> - -// @Override -// public void load(SerializedField metadata, ArrowBuf buffer) { -// clear(); - // the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant) -// final SerializedField bitsField = metadata.getChild(0); -// bits.load(bitsField, buffer); -// -// final int capacity = buffer.capacity(); -// final int bitsLength = bitsField.getBufferLength(); -// final SerializedField valuesField = metadata.getChild(1); -// values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength)); -// } - @Override public TransferPair getTransferPair(BufferAllocator allocator){ return new TransferImpl(name, allocator); @@ -356,10 +342,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type @Override public TransferPair makeTransferPair(ValueVector to) { - return new TransferImpl((Nullable${minor.class}Vector) to); + return new TransferImpl((${className}) to); } - public void transferTo(Nullable${minor.class}Vector target){ + public void transferTo(${className} target){ bits.transferTo(target.bits); values.transferTo(target.values); <#if type.major == "VarLen"> @@ -368,7 +354,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type clear(); } - public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) { + public void splitAndTransferTo(int startIndex, int length, ${className} target) { bits.splitAndTransferTo(startIndex, length, target.bits); values.splitAndTransferTo(startIndex, length, target.values); <#if type.major == "VarLen"> @@ -377,22 +363,22 @@ public final class ${className} extends BaseDataValueVector implements <#if type } private class TransferImpl implements TransferPair { - Nullable${minor.class}Vector to; + ${className} to; public TransferImpl(String name, BufferAllocator allocator){ <#if minor.class == "Decimal"> - to = new Nullable${minor.class}Vector(name, allocator, precision, scale); + to = new ${className}(name, allocator, precision, scale); <#else> - to = new Nullable${minor.class}Vector(name, allocator); + to = new ${className}(name, allocator); </#if> } - public TransferImpl(Nullable${minor.class}Vector to){ + public TransferImpl(${className} to){ this.to = to; } @Override - public Nullable${minor.class}Vector getTo(){ + public ${className} getTo(){ return to; } @@ -408,7 +394,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type @Override public void copyValueSafe(int fromIndex, int toIndex) { - to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this); + to.copyFromSafe(fromIndex, toIndex, ${className}.this); } } @@ -422,14 +408,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type return mutator; } - public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){ + public void copyFrom(int fromIndex, int thisIndex, ${className} from){ final Accessor fromAccessor = from.getAccessor(); if (!fromAccessor.isNull(fromIndex)) { mutator.set(thisIndex, fromAccessor.get(fromIndex)); } } - public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){ + public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from){ <#if type.major == "VarLen"> mutator.fillEmpties(thisIndex); </#if> @@ -437,7 +423,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type bits.getMutator().setSafe(thisIndex, 1); } - public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){ + public void copyFromSafe(int fromIndex, int thisIndex, ${className} from){ <#if type.major == "VarLen"> mutator.fillEmpties(thisIndex); </#if> @@ -640,7 +626,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type } public boolean isSafe(int outIndex) { - return outIndex < Nullable${minor.class}Vector.this.getValueCapacity(); + return outIndex < ${className}.this.getValueCapacity(); } <#assign fields = minor.fields!type.fields /> http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 4e68b68..18acdf4 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -103,6 +103,8 @@ public class UnionVector implements FieldVector { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + // truncate types vector buffer to size (#0) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 0, typeVector.getBufferSizeFor(fieldNode.getLength())); BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); this.valueCount = fieldNode.getLength(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java index 4c6d363..b7df8d1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java @@ -30,6 +30,9 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this + /** maximum extra size at the end of the buffer */ + private static final int MAX_BUFFER_PADDING = 64; + public static void load(ArrowFieldNode fieldNode, List<BufferBacked> vectors, List<ArrowBuf> buffers) { int expectedSize = vectors.size(); if (buffers.size() != expectedSize) { @@ -40,6 +43,20 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf } } + public static void truncateBufferBasedOnSize(List<ArrowBuf> buffers, int bufferIndex, int byteSize) { + if (bufferIndex >= buffers.size()) { + throw new IllegalArgumentException("no buffer at index " + bufferIndex + ": " + buffers); + } + ArrowBuf buffer = buffers.get(bufferIndex); + if (buffer.writerIndex() < byteSize) { + throw new IllegalArgumentException("can not truncate buffer to a larger size " + byteSize + ": " + buffer.writerIndex()); + } + if (buffer.writerIndex() - byteSize > MAX_BUFFER_PADDING) { + throw new IllegalArgumentException("Buffer too large to resize to " + byteSize + ": " + buffer.writerIndex()); + } + buffer.writerIndex(byteSize); + } + public static List<ArrowBuf> unload(List<BufferBacked> vectors) { List<ArrowBuf> result = new ArrayList<>(vectors.size()); for (BufferBacked vector : vectors) { http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index 7ce1236..48da8e7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -68,7 +68,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe int remainder = count % 8; // set remaining bits if (remainder > 0) { - byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));; + byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7)); this.data.setByte(fullBytesCount, bitMask); } } else if (fieldNode.getNullCount() == fieldNode.getLength()) { http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 757f061..5c1176c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -82,7 +82,7 @@ public class VectorLoader { vector.loadFieldBuffers(fieldNode, ownBuffers); } catch (RuntimeException e) { throw new IllegalArgumentException("Could not load buffers for field " + - field + " error message" + e.getMessage(), e); + field + ". error message: " + e.getMessage(), e); } List<Field> children = field.getChildren(); if (children.size() > 0) { http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index e18f99f..461bdbc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -93,6 +93,8 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector { @Override public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + // variable width values: truncate offset vector buffer to size (#1) + org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, offsets.getBufferSizeFor(fieldNode.getLength() + 1)); BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers); } http://git-wip-us.apache.org/repos/asf/arrow/blob/d06c4914/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java ---------------------------------------------------------------------- diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 9dfe8d8..7a70ffd 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.IntWriter; @@ -99,6 +101,79 @@ public class TestVectorUnloadLoad { } } + @Test + public void testUnloadLoadAddPadding() throws IOException { + int count = 10000; + Schema schema; + try ( + BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) { + + // write some data + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + ListWriter list = rootWriter.list("list"); + IntWriter intWriter = list.integer(); + for (int i = 0; i < count; i++) { + list.setPosition(i); + list.startList(); + for (int j = 0; j < i % 4 + 1; j++) { + intWriter.writeInt(i); + } + list.endList(); + } + writer.setValueCount(count); + + // unload it + FieldVector root = parent.getChild("root"); + schema = new Schema(root.getField().getChildren()); + VectorUnloader vectorUnloader = newVectorUnloader(root); + try ( + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); + VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); + ) { + List<ArrowBuf> oldBuffers = recordBatch.getBuffers(); + List<ArrowBuf> newBuffers = new ArrayList<>(); + for (ArrowBuf oldBuffer : oldBuffers) { + int l = oldBuffer.readableBytes(); + if (l % 64 != 0) { + // pad + l = l + 64 - l % 64; + } + ArrowBuf newBuffer = allocator.buffer(l); + for (int i = oldBuffer.readerIndex(); i < oldBuffer.writerIndex(); i++) { + newBuffer.setByte(i - oldBuffer.readerIndex(), oldBuffer.getByte(i)); + } + newBuffer.readerIndex(0); + newBuffer.writerIndex(l); + newBuffers.add(newBuffer); + } + + try (ArrowRecordBatch newBatch = new ArrowRecordBatch(recordBatch.getLength(), recordBatch.getNodes(), newBuffers);) { + // load it + VectorLoader vectorLoader = new VectorLoader(newRoot); + + vectorLoader.load(newBatch); + + FieldReader reader = newRoot.getVector("list").getReader(); + for (int i = 0; i < count; i++) { + reader.setPosition(i); + List<Integer> expected = new ArrayList<>(); + for (int j = 0; j < i % 4 + 1; j++) { + expected.add(i); + } + Assert.assertEquals(expected, reader.readObject()); + } + } + + for (ArrowBuf newBuf : newBuffers) { + newBuf.release(); + } + } + } + } + /** * The validity buffer can be empty if: * - all values are defined @@ -113,12 +188,17 @@ public class TestVectorUnloadLoad { )); int count = 10; ArrowBuf validity = allocator.getEmpty(); - ArrowBuf values = allocator.buffer(count * 4); // integers - for (int i = 0; i < count; i++) { - values.setInt(i * 4, i); + ArrowBuf[] values = new ArrowBuf[2]; + for (int i = 0; i < values.length; i++) { + ArrowBuf arrowBuf = allocator.buffer(count * 4); // integers + values[i] = arrowBuf; + for (int j = 0; j < count; j++) { + arrowBuf.setInt(j * 4, j); + } + arrowBuf.writerIndex(count * 4); } try ( - ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values, validity, values)); + ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1])); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator); ) { @@ -153,7 +233,9 @@ public class TestVectorUnloadLoad { assertFalse(intDefinedVector.getAccessor().isNull(count + 10)); assertEquals(1234, intDefinedVector.getAccessor().get(count + 10)); } finally { - values.release(); + for (ArrowBuf arrowBuf : values) { + arrowBuf.release(); + } } }