ARROW-264: File format This is work in progress
Author: Julien Le Dem <jul...@dremio.com> Closes #123 from julienledem/arrow_264_file_format and squashes the following commits: 252de6d [Julien Le Dem] remove outdated comment 04d797f [Julien Le Dem] maps are not nullable yet e8359b3 [Julien Le Dem] align on 8 byte boundaries; more tests 8b8b823 [Julien Le Dem] refactoring 31e95e6 [Julien Le Dem] fix list vector b824938 [Julien Le Dem] fix types; add licenses; more tests; more complex 2fd3bc1 [Julien Le Dem] cleanup 50fe680 [Julien Le Dem] nested support b0bf6bc [Julien Le Dem] cleanup 4247b1a [Julien Le Dem] fix whitespace d6a1788 [Julien Le Dem] refactoring 81863c5 [Julien Le Dem] fixed loader aa1b766 [Julien Le Dem] better test 2067e01 [Julien Le Dem] update format aacf61e [Julien Le Dem] fix pom b907aa5 [Julien Le Dem] simplify e43f26b [Julien Le Dem] add layout spec 0cc9718 [Julien Le Dem] add vector type ac6902a [Julien Le Dem] ARROW-264: File format 807db51 [Julien Le Dem] move information to schema f2f0596 [Julien Le Dem] Update FieldNode structure to be more explicit and reflect schema Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/803afeb5 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/803afeb5 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/803afeb5 Branch: refs/heads/master Commit: 803afeb502dcdd802fada2ed0d66c145546b8a78 Parents: ec51d56 Author: Julien Le Dem <jul...@dremio.com> Authored: Fri Aug 26 08:20:13 2016 -0700 Committer: Julien Le Dem <jul...@dremio.com> Committed: Fri Aug 26 08:20:13 2016 -0700 ---------------------------------------------------------------------- cpp/src/arrow/ipc/metadata-internal.cc | 1 + format/File.fbs | 28 ++ format/Message.fbs | 21 +- java/format/pom.xml | 1 + .../src/main/java/io/netty/buffer/ArrowBuf.java | 71 ++-- .../vector/src/main/codegen/data/ArrowTypes.tdd | 4 +- .../src/main/codegen/templates/ArrowType.java | 29 +- .../codegen/templates/NullableValueVectors.java | 49 ++- .../src/main/codegen/templates/UnionVector.java | 40 ++- .../arrow/vector/BaseDataValueVector.java | 38 ++- .../org/apache/arrow/vector/BufferBacked.java | 31 ++ .../org/apache/arrow/vector/FieldVector.java | 65 ++++ .../org/apache/arrow/vector/ValueVector.java | 6 +- .../org/apache/arrow/vector/VectorLoader.java | 99 ++++++ .../org/apache/arrow/vector/VectorUnloader.java | 78 +++++ .../org/apache/arrow/vector/ZeroVector.java | 39 ++- .../vector/complex/AbstractContainerVector.java | 21 +- .../arrow/vector/complex/AbstractMapVector.java | 42 ++- .../vector/complex/BaseRepeatedValueVector.java | 21 +- .../apache/arrow/vector/complex/ListVector.java | 58 +++- .../apache/arrow/vector/complex/MapVector.java | 59 +++- .../vector/complex/impl/ComplexWriterImpl.java | 2 +- .../vector/complex/impl/PromotableWriter.java | 3 +- .../apache/arrow/vector/file/ArrowBlock.java | 82 +++++ .../apache/arrow/vector/file/ArrowFooter.java | 144 ++++++++ .../apache/arrow/vector/file/ArrowReader.java | 151 +++++++++ .../apache/arrow/vector/file/ArrowWriter.java | 179 ++++++++++ .../vector/file/InvalidArrowFileException.java | 27 ++ .../apache/arrow/vector/schema/ArrowBuffer.java | 81 +++++ .../arrow/vector/schema/ArrowFieldNode.java | 53 +++ .../arrow/vector/schema/ArrowRecordBatch.java | 127 +++++++ .../arrow/vector/schema/ArrowVectorType.java | 47 +++ .../arrow/vector/schema/FBSerializable.java | 24 ++ .../arrow/vector/schema/FBSerializables.java | 37 +++ .../apache/arrow/vector/schema/TypeLayout.java | 208 ++++++++++++ .../arrow/vector/schema/VectorLayout.java | 93 ++++++ .../org/apache/arrow/vector/types/Types.java | 70 ++-- .../apache/arrow/vector/types/pojo/Field.java | 42 ++- .../apache/arrow/vector/types/pojo/Schema.java | 13 +- .../arrow/vector/TestVectorUnloadLoad.java | 89 +++++ .../ByteArrayReadableSeekableByteChannel.java | 80 +++++ .../apache/arrow/vector/file/TestArrowFile.java | 331 +++++++++++++++++++ .../arrow/vector/file/TestArrowFooter.java | 56 ++++ .../vector/file/TestArrowReaderWriter.java | 106 ++++++ .../apache/arrow/vector/pojo/TestConvert.java | 38 ++- 45 files changed, 2722 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 50db730..c921e4d 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -219,6 +219,7 @@ static Status FieldToFlatbuffer( RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data)); auto fb_children = fbb.CreateVector(children); + // TODO: produce the list of VectorTypes *offset = flatbuf::CreateField( fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary, fb_children); http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/File.fbs ---------------------------------------------------------------------- diff --git a/format/File.fbs b/format/File.fbs new file mode 100644 index 0000000..f7ad1e1 --- /dev/null +++ b/format/File.fbs @@ -0,0 +1,28 @@ +include "Message.fbs"; + +namespace org.apache.arrow.flatbuf; + +/// ---------------------------------------------------------------------- +/// Arrow File metadata +/// + +table Footer { + + schema: org.apache.arrow.flatbuf.Schema; + + dictionaries: [ Block ]; + + recordBatches: [ Block ]; +} + +struct Block { + + offset: long; + + metaDataLength: int; + + bodyLength: long; + +} + +root_type Footer; http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/Message.fbs ---------------------------------------------------------------------- diff --git a/format/Message.fbs b/format/Message.fbs index a78009b..b02f3fa 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -17,7 +17,7 @@ table Tuple { table List { } -enum UnionMode:int { Sparse, Dense } +enum UnionMode:short { Sparse, Dense } table Union { mode: UnionMode; @@ -28,7 +28,7 @@ table Int { is_signed: bool; } -enum Precision:int {SINGLE, DOUBLE} +enum Precision:short {SINGLE, DOUBLE} table FloatingPoint { precision: Precision; @@ -91,6 +91,17 @@ union Type { JSONScalar } +enum VectorType: short { + /// used in List type Dense Union and variable length primitive types (String, Binary) + OFFSET, + /// fixed length primitive values + VALUES, + /// Bit vector indicated if each value is null + VALIDITY, + /// Type vector used in Union type + TYPE +} + /// ---------------------------------------------------------------------- /// A field represents a named column in a record / row batch or child of a /// nested type. @@ -109,12 +120,16 @@ table Field { dictionary: long; // children apply only to Nested data types like Struct, List and Union children: [Field]; + /// the buffers produced for this type (as derived from the Type) + /// does not include children + /// each recordbatch will return instances of those Buffers. + buffers: [ VectorType ]; } /// ---------------------------------------------------------------------- /// Endianness of the platform that produces the RecordBatch -enum Endianness:int { Little, Big } +enum Endianness:short { Little, Big } /// ---------------------------------------------------------------------- /// A Schema describes the columns in a row batch http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/format/pom.xml ---------------------------------------------------------------------- diff --git a/java/format/pom.xml b/java/format/pom.xml index cb11b5f..dc58975 100644 --- a/java/format/pom.xml +++ b/java/format/pom.xml @@ -106,6 +106,7 @@ <argument>-o</argument> <argument>target/generated-sources/</argument> <argument>../../format/Message.fbs</argument> + <argument>../../format/File.fbs</argument> </arguments> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java ---------------------------------------------------------------------- diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index bbec26a..d10f002 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -17,8 +17,6 @@ */ package io.netty.buffer; -import io.netty.util.internal.PlatformDependent; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -30,16 +28,18 @@ import java.nio.charset.Charset; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.arrow.memory.AllocationManager.BufferLedger; import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.BoundsChecking; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.BufferManager; -import org.apache.arrow.memory.AllocationManager.BufferLedger; -import org.apache.arrow.memory.BaseAllocator.Verbosity; import org.apache.arrow.memory.util.HistoricalLog; import com.google.common.base.Preconditions; +import io.netty.util.internal.PlatformDependent; + public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); @@ -307,7 +307,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf order(ByteOrder endianness) { + public ArrowBuf order(ByteOrder endianness) { return this; } @@ -344,7 +344,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf slice() { + public ArrowBuf slice() { return slice(readerIndex(), readableBytes()); } @@ -467,7 +467,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf retain(int increment) { + public ArrowBuf retain(int increment) { Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment); if (isEmpty) { @@ -484,7 +484,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf retain() { + public ArrowBuf retain() { return retain(1); } @@ -535,49 +535,49 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf setShort(int index, int value) { + public ArrowBuf setShort(int index, int value) { chk(index, 2); PlatformDependent.putShort(addr(index), (short) value); return this; } @Override - public ByteBuf setInt(int index, int value) { + public ArrowBuf setInt(int index, int value) { chk(index, 4); PlatformDependent.putInt(addr(index), value); return this; } @Override - public ByteBuf setLong(int index, long value) { + public ArrowBuf setLong(int index, long value) { chk(index, 8); PlatformDependent.putLong(addr(index), value); return this; } @Override - public ByteBuf setChar(int index, int value) { + public ArrowBuf setChar(int index, int value) { chk(index, 2); PlatformDependent.putShort(addr(index), (short) value); return this; } @Override - public ByteBuf setFloat(int index, float value) { + public ArrowBuf setFloat(int index, float value) { chk(index, 4); PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value)); return this; } @Override - public ByteBuf setDouble(int index, double value) { + public ArrowBuf setDouble(int index, double value) { chk(index, 8); PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value)); return this; } @Override - public ByteBuf writeShort(int value) { + public ArrowBuf writeShort(int value) { ensure(2); PlatformDependent.putShort(addr(writerIndex), (short) value); writerIndex += 2; @@ -585,7 +585,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf writeInt(int value) { + public ArrowBuf writeInt(int value) { ensure(4); PlatformDependent.putInt(addr(writerIndex), value); writerIndex += 4; @@ -593,7 +593,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf writeLong(long value) { + public ArrowBuf writeLong(long value) { ensure(8); PlatformDependent.putLong(addr(writerIndex), value); writerIndex += 8; @@ -601,7 +601,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf writeChar(int value) { + public ArrowBuf writeChar(int value) { ensure(2); PlatformDependent.putShort(addr(writerIndex), (short) value); writerIndex += 2; @@ -609,7 +609,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf writeFloat(float value) { + public ArrowBuf writeFloat(float value) { ensure(4); PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); writerIndex += 4; @@ -617,7 +617,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf writeDouble(double value) { + public ArrowBuf writeDouble(double value) { ensure(8); PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); writerIndex += 8; @@ -625,19 +625,19 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + public ArrowBuf getBytes(int index, byte[] dst, int dstIndex, int length) { udle.getBytes(index + offset, dst, dstIndex, length); return this; } @Override - public ByteBuf getBytes(int index, ByteBuffer dst) { + public ArrowBuf getBytes(int index, ByteBuffer dst) { udle.getBytes(index + offset, dst); return this; } @Override - public ByteBuf setByte(int index, int value) { + public ArrowBuf setByte(int index, int value) { chk(index, 1); PlatformDependent.putByte(addr(index), (byte) value); return this; @@ -699,13 +699,13 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { udle.getBytes(index + offset, dst, dstIndex, length); return this; } @Override - public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOException { udle.getBytes(index + offset, out, length); return this; } @@ -724,12 +724,12 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { udle.setBytes(index + offset, src, srcIndex, length); return this; } - public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) { + public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) { if (src.isDirect()) { checkIndex(index, length); PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index, @@ -749,13 +749,13 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } @Override - public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + public ArrowBuf setBytes(int index, byte[] src, int srcIndex, int length) { udle.setBytes(index + offset, src, srcIndex, length); return this; } @Override - public ByteBuf setBytes(int index, ByteBuffer src) { + public ArrowBuf setBytes(int index, ByteBuffer src) { udle.setBytes(index + offset, src); return this; } @@ -860,4 +860,17 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable { } } + @Override + public ArrowBuf readerIndex(int readerIndex) { + super.readerIndex(readerIndex); + return this; + } + + @Override + public ArrowBuf writerIndex(int writerIndex) { + super.writerIndex(writerIndex); + return this; + } + + } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/data/ArrowTypes.tdd ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd b/java/vector/src/main/codegen/data/ArrowTypes.tdd index 4ab7f85..2ecad3d 100644 --- a/java/vector/src/main/codegen/data/ArrowTypes.tdd +++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd @@ -30,7 +30,7 @@ }, { name: "Union", - fields: [] + fields: [{name: "mode", type: short}] }, { name: "Int", @@ -38,7 +38,7 @@ }, { name: "FloatingPoint", - fields: [{name: precision, type: int}] + fields: [{name: precision, type: short}] }, { name: "Utf8", http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/ArrowType.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/ArrowType.java b/java/vector/src/main/codegen/templates/ArrowType.java index 6dfaf21..29dee20 100644 --- a/java/vector/src/main/codegen/templates/ArrowType.java +++ b/java/vector/src/main/codegen/templates/ArrowType.java @@ -24,9 +24,8 @@ import java.util.Objects; <@pp.dropOutputFile /> <@pp.changeOutputFile name="/org/apache/arrow/vector/types/pojo/ArrowType.java" /> - - <#include "/@includes/license.ftl" /> + package org.apache.arrow.vector.types.pojo; import com.google.flatbuffers.FlatBufferBuilder; @@ -38,7 +37,13 @@ public abstract class ArrowType { public abstract byte getTypeType(); public abstract int getType(FlatBufferBuilder builder); + public abstract <T> T accept(ArrowTypeVisitor<T> visitor); + public static interface ArrowTypeVisitor<T> { + <#list arrowTypes.types as type> + T visit(${type.name} type); + </#list> + } <#list arrowTypes.types as type> <#assign name = type.name> @@ -70,9 +75,14 @@ public abstract class ArrowType { @Override public int getType(FlatBufferBuilder builder) { + <#list type.fields as field> + <#if field.type == "String"> + int ${field.name} = builder.createString(this.${field.name}); + </#if> + </#list> org.apache.arrow.flatbuf.${type.name}.start${type.name}(builder); <#list type.fields as field> - org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, <#if field.type == "String">builder.createString(${field.name})<#else>${field.name}</#if>); + org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, ${field.name}); </#list> return org.apache.arrow.flatbuf.${type.name}.end${type.name}(builder); } @@ -83,6 +93,14 @@ public abstract class ArrowType { } </#list> + public String toString() { + return "${name}{" + <#list fields as field> + + ", " + ${field.name} + </#list> + + "}"; + } + @Override public int hashCode() { return Objects.hash(<#list type.fields as field>${field.name}<#if field_has_next>, </#if></#list>); @@ -102,6 +120,11 @@ public abstract class ArrowType { </#list> </#if> } + + @Override + public <T> T accept(ArrowTypeVisitor<T> visitor) { + return visitor.visit(this); + } } </#list> http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java index df50897..6b1aa04 100644 --- a/java/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java @@ -29,6 +29,9 @@ package org.apache.arrow.vector; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import java.util.Collections; + <#include "/@includes/vv_imports.ftl" /> /** @@ -39,7 +42,7 @@ package org.apache.arrow.vector; * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker. */ @SuppressWarnings("unused") -public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{ +public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class); private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this); @@ -54,6 +57,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type private final Mutator mutator; private final Accessor accessor; + private final List<BufferBacked> innerVectors; + <#if minor.class == "Decimal"> private final int precision; private final int scale; @@ -66,6 +71,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type mutator = new Mutator(); accessor = new Accessor(); field = new Field(name, true, new Decimal(precision, scale), null); + innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList( + bits, + values + )); } <#else> public ${className}(String name, BufferAllocator allocator) { @@ -88,9 +97,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type <#elseif minor.class == "Time"> field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null); <#elseif minor.class == "Float4"> - field = new Field(name, true, new FloatingPoint(0), null); + field = new Field(name, true, new FloatingPoint(org.apache.arrow.flatbuf.Precision.SINGLE), null); <#elseif minor.class == "Float8"> - field = new Field(name, true, new FloatingPoint(1), null); + field = new Field(name, true, new FloatingPoint(org.apache.arrow.flatbuf.Precision.DOUBLE), null); <#elseif minor.class == "TimeStamp"> field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(""), null); <#elseif minor.class == "IntervalDay"> @@ -104,10 +113,44 @@ public final class ${className} extends BaseDataValueVector implements <#if type <#elseif minor.class == "Bit"> field = new Field(name, true, new Bool(), null); </#if> + innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList( + bits, + <#if type.major = "VarLen"> + values.offsetVector, + </#if> + values + )); } </#if> @Override + public List<BufferBacked> getFieldInnerVectors() { + return innerVectors; + } + + @Override + public void initializeChildrenFromFields(List<Field> children) { + if (!children.isEmpty()) { + throw new IllegalArgumentException("primitive type vector ${className} can not have children: " + children); + } + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return Collections.emptyList(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); + // TODO: do something with the sizes in fieldNode? + } + + public List<ArrowBuf> getFieldBuffers() { + return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors()); + } + + @Override public Field getField() { return field; } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/UnionVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 1fef490..72125fa 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -42,6 +42,10 @@ import java.util.ArrayList; import java.util.Iterator; import org.apache.arrow.vector.complex.impl.ComplexCopier; import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.schema.ArrowFieldNode; + +import static org.apache.arrow.flatbuf.UnionMode.Sparse; + /* * This class is generated using freemarker and the ${.template_name} template. @@ -57,7 +61,7 @@ import org.apache.arrow.vector.util.CallBack; * For performance reasons, UnionVector stores a cached reference to each subtype vector, to avoid having to do the map lookup * each time the vector is accessed. */ -public class UnionVector implements ValueVector { +public class UnionVector implements FieldVector { private String name; private BufferAllocator allocator; @@ -95,6 +99,34 @@ public class UnionVector implements ValueVector { return MinorType.UNION; } + @Override + public void initializeChildrenFromFields(List<Field> children) { + getMap().initializeChildrenFromFields(children); + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return getMap().getChildrenFromFields(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + // TODO + throw new UnsupportedOperationException(); + } + + @Override + public List<ArrowBuf> getFieldBuffers() { + // TODO + throw new UnsupportedOperationException(); + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { + // TODO + throw new UnsupportedOperationException(); + } + public MapVector getMap() { if (mapVector == null) { int vectorCount = internalMap.size(); @@ -203,7 +235,7 @@ public class UnionVector implements ValueVector { for (ValueVector v : internalMap.getChildren()) { childFields.add(v.getField()); } - return new Field(name, true, new ArrowType.Union(), childFields); + return new Field(name, true, new ArrowType.Union(Sparse), childFields); } @Override @@ -237,10 +269,10 @@ public class UnionVector implements ValueVector { copyFrom(inIndex, outIndex, from); } - public ValueVector addVector(ValueVector v) { + public FieldVector addVector(FieldVector v) { String name = v.getMinorType().name().toLowerCase(); Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name)); - final ValueVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass()); + final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass()); v.makeTransferPair(newVector).transfer(); internalMap.putChild(name, newVector); if (callBack != null) { http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java index 05b7cf1..c22258d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java @@ -17,15 +17,38 @@ */ package org.apache.arrow.vector; -import io.netty.buffer.ArrowBuf; +import java.util.ArrayList; +import java.util.List; import org.apache.arrow.memory.BufferAllocator; +import io.netty.buffer.ArrowBuf; + -public abstract class BaseDataValueVector extends BaseValueVector { +public abstract class BaseDataValueVector extends BaseValueVector implements BufferBacked { protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this + public static void load(List<BufferBacked> vectors, List<ArrowBuf> buffers) { + int expectedSize = vectors.size(); + if (buffers.size() != expectedSize) { + throw new IllegalArgumentException("Illegal buffer count, expected " + expectedSize + ", got: " + buffers.size()); + } + for (int i = 0; i < expectedSize; i++) { + vectors.get(i).load(buffers.get(i)); + } + } + + public static List<ArrowBuf> unload(List<BufferBacked> vectors) { + List<ArrowBuf> result = new ArrayList<>(vectors.size()); + for (BufferBacked vector : vectors) { + result.add(vector.unLoad()); + } + return result; + } + + // TODO: Nullable vectors extend BaseDataValueVector but do not use the data field + // We should fix the inheritance tree protected ArrowBuf data; public BaseDataValueVector(String name, BufferAllocator allocator) { @@ -82,6 +105,17 @@ public abstract class BaseDataValueVector extends BaseValueVector { return data; } + @Override + public void load(ArrowBuf data) { + this.data.release(); + this.data = data.retain(allocator); + } + + @Override + public ArrowBuf unLoad() { + return this.data.readerIndex(0); + } + /** * This method has a similar effect of allocateNew() without actually clearing and reallocating * the value vector. The purpose is to move the value vector to a "mutate" state http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java new file mode 100644 index 0000000..d1c262d --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import io.netty.buffer.ArrowBuf; + +/** + * Content is backed by a buffer and can be loaded/unloaded + */ +public interface BufferBacked { + + void load(ArrowBuf data); + + ArrowBuf unLoad(); + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java new file mode 100644 index 0000000..b28433c --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import java.util.List; + +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.types.pojo.Field; + +import io.netty.buffer.ArrowBuf; + +/** + * A vector corresponding to a Field in the schema + * It has inner vectors backed by buffers (validity, offsets, data, ...) + */ +public interface FieldVector extends ValueVector { + + /** + * Initializes the child vectors + * to be later loaded with loadBuffers + * @param children the schema + */ + void initializeChildrenFromFields(List<Field> children); + + /** + * the returned list is the same size as the list passed to initializeChildrenFromFields + * @return the children according to schema (empty for primitive types) + */ + List<FieldVector> getChildrenFromFields(); + + /** + * loads data in the vectors + * (ownBuffers must be the same size as getFieldVectors()) + * @param fieldNode the fieldNode + * @param ownBuffers the buffers for this Field (own buffers only, children not included) + */ + void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers); + + /** + * (same size as getFieldVectors() since it is their content) + * @return the buffers containing the data for this vector (ready for reading) + */ + List<ArrowBuf> getFieldBuffers(); + + /** + * @return the inner vectors for this field as defined by the TypeLayout + */ + List<BufferBacked> getFieldInnerVectors(); + +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java index 35321c9..ba7790e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java @@ -19,14 +19,14 @@ package org.apache.arrow.vector; import java.io.Closeable; -import io.netty.buffer.ArrowBuf; - import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.Types.MinorType; -import org.apache.arrow.vector.util.TransferPair; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.TransferPair; + +import io.netty.buffer.ArrowBuf; /** * An abstraction that is used to store a sequence of values in an individual column. http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java new file mode 100644 index 0000000..58ac68b --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.schema.VectorLayout; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.google.common.collect.Iterators; + +import io.netty.buffer.ArrowBuf; + +/** + * Loads buffers into vectors + */ +public class VectorLoader { + private final List<FieldVector> fieldVectors; + private final List<Field> fields; + + /** + * will create children in root based on schema + * @param schema the expected schema + * @param root the root to add vectors to based on schema + */ + public VectorLoader(Schema schema, FieldVector root) { + super(); + this.fields = schema.getFields(); + root.initializeChildrenFromFields(fields); + this.fieldVectors = root.getChildrenFromFields(); + if (this.fieldVectors.size() != fields.size()) { + throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + fields.size()); + } + } + + /** + * Loads the record batch in the vectors + * will not close the record batch + * @param recordBatch + */ + public void load(ArrowRecordBatch recordBatch) { + Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator(); + Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator(); + for (int i = 0; i < fields.size(); ++i) { + Field field = fields.get(i); + FieldVector fieldVector = fieldVectors.get(i); + loadBuffers(fieldVector, field, buffers, nodes); + } + if (nodes.hasNext() || buffers.hasNext()) { + throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers)); + } + } + + private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) { + ArrowFieldNode fieldNode = nodes.next(); + List<VectorLayout> typeLayout = field.getTypeLayout().getVectors(); + List<ArrowBuf> ownBuffers = new ArrayList<>(typeLayout.size()); + for (int j = 0; j < typeLayout.size(); j++) { + ownBuffers.add(buffers.next()); + } + try { + vector.loadFieldBuffers(fieldNode, ownBuffers); + } catch (RuntimeException e) { + throw new IllegalArgumentException("Could not load buffers for field " + field); + } + List<Field> children = field.getChildren(); + if (children.size() > 0) { + List<FieldVector> childrenFromFields = vector.getChildrenFromFields(); + checkArgument(children.size() == childrenFromFields.size(), "should have as many children as in the schema: found " + childrenFromFields.size() + " expected " + children.size()); + for (int i = 0; i < childrenFromFields.size(); i++) { + Field child = children.get(i); + FieldVector fieldVector = childrenFromFields.get(i); + loadBuffers(fieldVector, child, buffers, nodes); + } + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java new file mode 100644 index 0000000..e4d37bf --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.vector.ValueVector.Accessor; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.schema.ArrowVectorType; +import org.apache.arrow.vector.types.pojo.Schema; + +import io.netty.buffer.ArrowBuf; + +public class VectorUnloader { + + private final Schema schema; + private final int valueCount; + private final List<FieldVector> vectors; + + public VectorUnloader(FieldVector parent) { + super(); + this.schema = new Schema(parent.getField().getChildren()); + this.valueCount = parent.getAccessor().getValueCount(); + this.vectors = parent.getChildrenFromFields(); + } + + public Schema getSchema() { + return schema; + } + + public ArrowRecordBatch getRecordBatch() { + List<ArrowFieldNode> nodes = new ArrayList<>(); + List<ArrowBuf> buffers = new ArrayList<>(); + for (FieldVector vector : vectors) { + appendNodes(vector, nodes, buffers); + } + return new ArrowRecordBatch(valueCount, nodes, buffers); + } + + private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) { + Accessor accessor = vector.getAccessor(); + int nullCount = 0; + // TODO: should not have to do that + // we can do that a lot more efficiently (for example with Long.bitCount(i)) + for (int i = 0; i < accessor.getValueCount(); i++) { + if (accessor.isNull(i)) { + nullCount ++; + } + } + nodes.add(new ArrowFieldNode(accessor.getValueCount(), nullCount)); + List<ArrowBuf> fieldBuffers = vector.getFieldBuffers(); + List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes(); + if (fieldBuffers.size() != expectedBuffers.size()) { + throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers); + } + buffers.addAll(fieldBuffers); + for (FieldVector child : vector.getChildrenFromFields()) { + appendNodes(child, nodes, buffers); + } + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java index 705a24b..c2482ad 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java @@ -17,25 +17,23 @@ */ package org.apache.arrow.vector; -import com.google.flatbuffers.FlatBufferBuilder; -import io.netty.buffer.ArrowBuf; - import java.util.Collections; import java.util.Iterator; +import java.util.List; -import org.apache.arrow.flatbuf.Type; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.complex.impl.NullReader; import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType.Null; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.TransferPair; -import com.google.common.collect.Iterators; +import io.netty.buffer.ArrowBuf; -public class ZeroVector implements ValueVector { +public class ZeroVector implements FieldVector { public final static ZeroVector INSTANCE = new ZeroVector(); private final String name = "[DEFAULT]"; @@ -175,4 +173,33 @@ public class ZeroVector implements ValueVector { public FieldReader getReader() { return NullReader.INSTANCE; } + + @Override + public void initializeChildrenFromFields(List<Field> children) { + if (!children.isEmpty()) { + throw new IllegalArgumentException("Zero vector has no children"); + } + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return Collections.emptyList(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + if (!ownBuffers.isEmpty()) { + throw new IllegalArgumentException("Zero vector has no buffers"); + } + } + + @Override + public List<ArrowBuf> getFieldBuffers() { + return Collections.emptyList(); + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { + return Collections.emptyList(); + } } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java index ed77975..2f68886 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java @@ -17,22 +17,13 @@ */ package org.apache.arrow.vector.complex; -import java.util.Collection; - -import javax.annotation.Nullable; - -import org.apache.arrow.flatbuf.Field; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.util.CallBack; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - /** * Base class for composite vectors. * @@ -65,8 +56,8 @@ public abstract class AbstractContainerVector implements ValueVector { /** * Returns a {@link org.apache.arrow.vector.ValueVector} corresponding to the given field name if exists or null. */ - public ValueVector getChild(String name) { - return getChild(name, ValueVector.class); + public FieldVector getChild(String name) { + return getChild(name, FieldVector.class); } /** @@ -81,7 +72,7 @@ public abstract class AbstractContainerVector implements ValueVector { protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz) { if (clazz.isAssignableFrom(v.getClass())) { - return (T) v; + return clazz.cast(v); } throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Arrow doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName())); } @@ -94,10 +85,10 @@ public abstract class AbstractContainerVector implements ValueVector { public abstract int size(); // add a new vector with the input MajorType or return the existing vector if we already added one with the same type - public abstract <T extends ValueVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale); + public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale); // return the child vector with the input name - public abstract <T extends ValueVector> T getChild(String name, Class<T> clazz); + public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz); // return the child vector's ordinal in the composite container public abstract VectorWithOrdinal getChildVectorWithOrdinal(String name); http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java index 5964f80..23b4997 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java @@ -17,23 +17,24 @@ */ package org.apache.arrow.vector.complex; -import com.google.common.collect.ImmutableList; -import io.netty.buffer.ArrowBuf; - import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import org.apache.arrow.flatbuf.Field; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.MapWithOrdinal; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import io.netty.buffer.ArrowBuf; + /* * Base class for MapVectors. Currently used by RepeatedMapVector and MapVector */ @@ -41,7 +42,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class); // Maintains a map with key as field name and value is the vector itself - private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>(); + private final MapWithOrdinal<String, FieldVector> vectors = new MapWithOrdinal<>(); protected AbstractMapVector(String name, BufferAllocator allocator, CallBack callBack) { super(name, allocator, callBack); @@ -109,19 +110,19 @@ public abstract class AbstractMapVector extends AbstractContainerVector { * @return resultant {@link org.apache.arrow.vector.ValueVector} */ @Override - public <T extends ValueVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) { + public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) { final ValueVector existing = getChild(name); boolean create = false; if (existing == null) { create = true; } else if (clazz.isAssignableFrom(existing.getClass())) { - return (T) existing; + return clazz.cast(existing); } else if (nullFilled(existing)) { existing.clear(); create = true; } if (create) { - final T vector = (T) minorType.getNewVector(name, allocator, callBack, precisionScale); + final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale)); putChild(name, vector); if (callBack!=null) { callBack.doWork(); @@ -153,7 +154,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { * field name if exists or null. */ @Override - public <T extends ValueVector> T getChild(String name, Class<T> clazz) { + public <T extends FieldVector> T getChild(String name, Class<T> clazz) { final ValueVector v = vectors.get(name.toLowerCase()); if (v == null) { return null; @@ -161,12 +162,25 @@ public abstract class AbstractMapVector extends AbstractContainerVector { return typeify(v, clazz); } + protected ValueVector add(String name, MinorType minorType, int... precisionScale) { + final ValueVector existing = getChild(name); + if (existing != null) { + throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType)); + } + FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale); + putChild(name, vector); + if (callBack!=null) { + callBack.doWork(); + } + return vector; + } + /** * Inserts the vector with the given name if it does not exist else replaces it with the new value. * * Note that this method does not enforce any vector type check nor throws a schema change exception. */ - protected void putChild(String name, ValueVector vector) { + protected void putChild(String name, FieldVector vector) { putVector(name, vector); } @@ -175,7 +189,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { * @param name field name * @param vector vector to be inserted */ - protected void putVector(String name, ValueVector vector) { + protected void putVector(String name, FieldVector vector) { final ValueVector old = vectors.put( Preconditions.checkNotNull(name, "field name cannot be null").toLowerCase(), Preconditions.checkNotNull(vector, "vector cannot be null") @@ -189,9 +203,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector { /** * Returns a sequence of underlying child vectors. */ - protected List<ValueVector> getChildren() { + protected List<FieldVector> getChildren() { int size = vectors.size(); - List<ValueVector> children = new ArrayList<>(); + List<FieldVector> children = new ArrayList<>(); for (int i = 0; i < size; i++) { children.add(vectors.getByOrdinal(i)); } @@ -216,7 +230,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { @Override public Iterator<ValueVector> iterator() { - return vectors.values().iterator(); + return Collections.<ValueVector>unmodifiableCollection(vectors.values()).iterator(); } /** http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index 4226274..517d20c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -17,8 +17,6 @@ */ package org.apache.arrow.vector.complex; -import io.netty.buffer.ArrowBuf; - import java.util.Collections; import java.util.Iterator; @@ -26,29 +24,32 @@ import org.apache.arrow.flatbuf.Type; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.AddOrGetResult; import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.util.SchemaChangeRuntimeException; import com.google.common.base.Preconditions; import com.google.common.collect.ObjectArrays; -import org.apache.arrow.vector.types.Types.MinorType; -import org.apache.arrow.vector.util.SchemaChangeRuntimeException; + +import io.netty.buffer.ArrowBuf; public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector { - public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE; + public final static FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE; public final static String OFFSETS_VECTOR_NAME = "$offsets$"; public final static String DATA_VECTOR_NAME = "$data$"; protected final UInt4Vector offsets; - protected ValueVector vector; + protected FieldVector vector; protected BaseRepeatedValueVector(String name, BufferAllocator allocator) { this(name, allocator, DEFAULT_DATA_VECTOR); } - protected BaseRepeatedValueVector(String name, BufferAllocator allocator, ValueVector vector) { + protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector) { super(name, allocator); this.offsets = new UInt4Vector(OFFSETS_VECTOR_NAME, allocator); this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null"); @@ -83,7 +84,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements } @Override - public ValueVector getDataVector() { + public FieldVector getDataVector() { return vector; } @@ -121,7 +122,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements @Override public Iterator<ValueVector> iterator() { - return Collections.singleton(getDataVector()).iterator(); + return Collections.<ValueVector>singleton(getDataVector()).iterator(); } @Override @@ -167,7 +168,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements return new AddOrGetResult<>((T)vector, created); } - protected void replaceDataVector(ValueVector v) { + protected void replaceDataVector(FieldVector v) { vector.clear(); vector = v; } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index c6c6b09..2984c36 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -18,15 +18,18 @@ ******************************************************************************/ package org.apache.arrow.vector.complex; -import com.google.common.collect.ImmutableList; -import com.google.flatbuffers.FlatBufferBuilder; -import io.netty.buffer.ArrowBuf; +import static java.util.Collections.singletonList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; import org.apache.arrow.vector.AddOrGetResult; +import org.apache.arrow.vector.BaseDataValueVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.UInt1Vector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.ValueVector; @@ -36,18 +39,24 @@ import org.apache.arrow.vector.complex.impl.UnionListReader; import org.apache.arrow.vector.complex.impl.UnionListWriter; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.FieldWriter; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.CallBack; import org.apache.arrow.vector.util.JsonStringArrayList; import org.apache.arrow.vector.util.TransferPair; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ObjectArrays; -public class ListVector extends BaseRepeatedValueVector { +import io.netty.buffer.ArrowBuf; + +public class ListVector extends BaseRepeatedValueVector implements FieldVector { - UInt4Vector offsets; + final UInt4Vector offsets; final UInt1Vector bits; + private final List<BufferBacked> innerVectors; private Mutator mutator = new Mutator(); private Accessor accessor = new Accessor(); private UnionListWriter writer; @@ -57,12 +66,46 @@ public class ListVector extends BaseRepeatedValueVector { public ListVector(String name, BufferAllocator allocator, CallBack callBack) { super(name, allocator); this.bits = new UInt1Vector("$bits$", allocator); - offsets = getOffsetVector(); + this.offsets = getOffsetVector(); + this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets)); this.writer = new UnionListWriter(this); this.reader = new UnionListReader(this); this.callBack = callBack; } + @Override + public void initializeChildrenFromFields(List<Field> children) { + if (children.size() != 1) { + throw new IllegalArgumentException("Lists have only one child. Found: " + children); + } + Field field = children.get(0); + MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); + AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType); + if (!addOrGetVector.isCreated()) { + throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector()); + } + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return singletonList(getDataVector()); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); + } + + @Override + public List<ArrowBuf> getFieldBuffers() { + return BaseDataValueVector.unload(getFieldInnerVectors()); + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { + return innerVectors; + } + public UnionListWriter getWriter() { return writer; } @@ -86,7 +129,7 @@ public class ListVector extends BaseRepeatedValueVector { } @Override - public ValueVector getDataVector() { + public FieldVector getDataVector() { return vector; } @@ -298,4 +341,5 @@ public class ListVector extends BaseRepeatedValueVector { bits.getMutator().setValueCount(valueCount); } } + } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java index 0cb613e..e369658 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java @@ -17,10 +17,10 @@ */ package org.apache.arrow.vector.complex; -import io.netty.buffer.ArrowBuf; - import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,13 +28,17 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BaseDataValueVector; import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.ComplexHolder; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.Types.MinorType; -import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.ArrowType.Tuple; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.CallBack; @@ -45,7 +49,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; -public class MapVector extends AbstractMapVector { +import io.netty.buffer.ArrowBuf; + +public class MapVector extends AbstractMapVector implements FieldVector { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class); private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this); @@ -53,6 +59,9 @@ public class MapVector extends AbstractMapVector { private final Mutator mutator = new Mutator(); int valueCount; + // TODO: validity vector + private final List<BufferBacked> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList()); + public MapVector(String name, BufferAllocator allocator, CallBack callBack){ super(name, allocator, callBack); } @@ -120,7 +129,7 @@ public class MapVector extends AbstractMapVector { int expectedSize = getBufferSize(); int actualSize = super.getBufferSize(); - Preconditions.checkArgument(expectedSize == actualSize); + Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize); return super.getBuffers(clear); } @@ -159,7 +168,7 @@ public class MapVector extends AbstractMapVector { this.to.ephPair = null; int i = 0; - ValueVector vector; + FieldVector vector; for (String child:from.getChildFieldNames()) { int preSize = to.size(); vector = from.getChild(child); @@ -175,7 +184,7 @@ public class MapVector extends AbstractMapVector { // (This is similar to what happens in ScanBatch where the children cannot be added till they are // read). To take care of this, we ensure that the hashCode of the MaterializedField does not // include the hashCode of the children but is based only on MaterializedField$key. - final ValueVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass()); + final FieldVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass()); if (allocate && to.size() != preSize) { newVector.allocateNew(); } @@ -315,13 +324,45 @@ public class MapVector extends AbstractMapVector { @Override public void close() { - final Collection<ValueVector> vectors = getChildren(); - for (final ValueVector v : vectors) { + final Collection<FieldVector> vectors = getChildren(); + for (final FieldVector v : vectors) { v.close(); } vectors.clear(); + valueCount = 0; super.close(); } + + @Override + public void initializeChildrenFromFields(List<Field> children) { + for (Field field : children) { + MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); + FieldVector vector = (FieldVector)this.add(field.getName(), minorType); + vector.initializeChildrenFromFields(field.getChildren()); + } + } + + @Override + public List<FieldVector> getChildrenFromFields() { + return getChildren(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) { + BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); + // TODO: something with fieldNode? + } + + @Override + public List<ArrowBuf> getFieldBuffers() { + return BaseDataValueVector.unload(getFieldInnerVectors()); + } + + @Override + public List<BufferBacked> getFieldInnerVectors() { + return innerVectors; + } + } http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java index 4d2adfb..89bfefc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java @@ -22,9 +22,9 @@ import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StateTool; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; import com.google.common.base.Preconditions; -import org.apache.arrow.vector.types.pojo.Field; public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class); http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java index 586b128..c282688 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java @@ -17,6 +17,7 @@ */ package org.apache.arrow.vector.complex.impl; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.complex.AbstractMapVector; @@ -129,7 +130,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter { } else if (listVector != null) { unionVector = listVector.promoteToUnion(); } - unionVector.addVector(tp.getTo()); + unionVector.addVector((FieldVector)tp.getTo()); writer = new UnionWriter(unionVector); writer.setPosition(idx()); for (int i = 0; i < idx(); i++) { http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java new file mode 100644 index 0000000..90fb02b --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import org.apache.arrow.flatbuf.Block; +import org.apache.arrow.vector.schema.FBSerializable; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class ArrowBlock implements FBSerializable { + + private final long offset; + private final int metadataLength; + private final long bodyLength; + + public ArrowBlock(long offset, int metadataLength, long bodyLength) { + super(); + this.offset = offset; + this.metadataLength = metadataLength; + this.bodyLength = bodyLength; + } + + public long getOffset() { + return offset; + } + + public int getMetadataLength() { + return metadataLength; + } + + public long getBodyLength() { + return bodyLength; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + return Block.createBlock(builder, offset, metadataLength, bodyLength); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32)); + result = prime * result + metadataLength; + result = prime * result + (int) (offset ^ (offset >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ArrowBlock other = (ArrowBlock) obj; + if (bodyLength != other.bodyLength) + return false; + if (metadataLength != other.metadataLength) + return false; + if (offset != other.offset) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java new file mode 100644 index 0000000..01e175b --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.flatbuf.Block; +import org.apache.arrow.flatbuf.Footer; +import org.apache.arrow.vector.schema.FBSerializable; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.google.flatbuffers.FlatBufferBuilder; + +public class ArrowFooter implements FBSerializable { + + private final Schema schema; + + private final List<ArrowBlock> dictionaries; + + private final List<ArrowBlock> recordBatches; + + public ArrowFooter(Schema schema, List<ArrowBlock> dictionaries, List<ArrowBlock> recordBatches) { + super(); + this.schema = schema; + this.dictionaries = dictionaries; + this.recordBatches = recordBatches; + } + + public ArrowFooter(Footer footer) { + this( + Schema.convertSchema(footer.schema()), + dictionaries(footer), + recordBatches(footer) + ); + } + + private static List<ArrowBlock> recordBatches(Footer footer) { + List<ArrowBlock> recordBatches = new ArrayList<>(); + Block tempBLock = new Block(); + int recordBatchesLength = footer.recordBatchesLength(); + for (int i = 0; i < recordBatchesLength; i++) { + Block block = footer.recordBatches(tempBLock, i); + recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); + } + return recordBatches; + } + + private static List<ArrowBlock> dictionaries(Footer footer) { + List<ArrowBlock> dictionaries = new ArrayList<>(); + Block tempBLock = new Block(); + int dictionariesLength = footer.dictionariesLength(); + for (int i = 0; i < dictionariesLength; i++) { + Block block = footer.dictionaries(tempBLock, i); + dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength())); + } + return dictionaries; + } + + public Schema getSchema() { + return schema; + } + + public List<ArrowBlock> getDictionaries() { + return dictionaries; + } + + public List<ArrowBlock> getRecordBatches() { + return recordBatches; + } + + @Override + public int writeTo(FlatBufferBuilder builder) { + int schemaIndex = schema.getSchema(builder); + Footer.startDictionariesVector(builder, dictionaries.size()); + int dicsOffset = endVector(builder, dictionaries); + Footer.startRecordBatchesVector(builder, recordBatches.size()); + int rbsOffset = endVector(builder, recordBatches); + Footer.startFooter(builder); + Footer.addSchema(builder, schemaIndex); + Footer.addDictionaries(builder, dicsOffset); + Footer.addRecordBatches(builder, rbsOffset); + return Footer.endFooter(builder); + } + + private int endVector(FlatBufferBuilder builder, List<ArrowBlock> blocks) { + for (ArrowBlock block : blocks) { + block.writeTo(builder); + } + return builder.endVector(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dictionaries == null) ? 0 : dictionaries.hashCode()); + result = prime * result + ((recordBatches == null) ? 0 : recordBatches.hashCode()); + result = prime * result + ((schema == null) ? 0 : schema.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ArrowFooter other = (ArrowFooter) obj; + if (dictionaries == null) { + if (other.dictionaries != null) + return false; + } else if (!dictionaries.equals(other.dictionaries)) + return false; + if (recordBatches == null) { + if (other.recordBatches != null) + return false; + } else if (!recordBatches.equals(other.recordBatches)) + return false; + if (schema == null) { + if (other.schema != null) + return false; + } else if (!schema.equals(other.schema)) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java ---------------------------------------------------------------------- diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java new file mode 100644 index 0000000..bbcd3e9 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.file; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.arrow.flatbuf.Buffer; +import org.apache.arrow.flatbuf.FieldNode; +import org.apache.arrow.flatbuf.Footer; +import org.apache.arrow.flatbuf.RecordBatch; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ArrowBuf; + +public class ArrowReader implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class); + + private static final byte[] MAGIC = "ARROW1".getBytes(); + + private final SeekableByteChannel in; + + private final BufferAllocator allocator; + + private ArrowFooter footer; + + public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) { + super(); + this.in = in; + this.allocator = allocator; + } + + private int readFully(ArrowBuf buffer, int l) throws IOException { + int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l)); + buffer.writerIndex(n); + if (n != l) { + throw new IllegalStateException(n + " != " + l); + } + return n; + } + + private int readFully(ByteBuffer buffer) throws IOException { + int total = 0; + int n; + do { + n = in.read(buffer); + total += n; + } while (n >= 0 && buffer.remaining() > 0); + buffer.flip(); + return total; + } + + private static int bytesToInt(byte[] bytes) { + return ((int)(bytes[3] & 255) << 24) + + ((int)(bytes[2] & 255) << 16) + + ((int)(bytes[1] & 255) << 8) + + ((int)(bytes[0] & 255) << 0); + } + + public ArrowFooter readFooter() throws IOException { + if (footer == null) { + if (in.size() <= (MAGIC.length * 2 + 4)) { + throw new InvalidArrowFileException("file too small: " + in.size()); + } + ByteBuffer buffer = ByteBuffer.allocate(4 + MAGIC.length); + long footerLengthOffset = in.size() - buffer.remaining(); + in.position(footerLengthOffset); + readFully(buffer); + byte[] array = buffer.array(); + if (!Arrays.equals(MAGIC, Arrays.copyOfRange(array, 4, array.length))) { + throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array())); + } + int footerLength = bytesToInt(array); + if (footerLength <= 0 || footerLength + MAGIC.length * 2 + 4 > in.size()) { + throw new InvalidArrowFileException("invalid footer length: " + footerLength); + } + long footerOffset = footerLengthOffset - footerLength; + LOGGER.debug(String.format("Footer starts at %d, length: %d", footerOffset, footerLength)); + ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength); + in.position(footerOffset); + readFully(footerBuffer); + Footer footerFB = Footer.getRootAsFooter(footerBuffer); + this.footer = new ArrowFooter(footerFB); + } + return footer; + } + + // TODO: read dictionaries + + public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException { + LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength())); + int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength()); + if (l < 0) { + throw new InvalidArrowFileException("block invalid: " + recordBatchBlock); + } + final ArrowBuf buffer = allocator.buffer(l); + LOGGER.debug("allocated buffer " + buffer); + in.position(recordBatchBlock.getOffset()); + int n = readFully(buffer, l); + if (n != l) { + throw new IllegalStateException(n + " != " + l); + } + RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer()); + int nodesLength = recordBatchFB.nodesLength(); + final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength()); + List<ArrowFieldNode> nodes = new ArrayList<>(); + for (int i = 0; i < nodesLength; ++i) { + FieldNode node = recordBatchFB.nodes(i); + nodes.add(new ArrowFieldNode(node.length(), node.nullCount())); + } + List<ArrowBuf> buffers = new ArrayList<>(); + for (int i = 0; i < recordBatchFB.buffersLength(); ++i) { + Buffer bufferFB = recordBatchFB.buffers(i); + LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length())); + ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length()); + buffers.add(vectorBuffer); + } + ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers); + LOGGER.debug("released buffer " + buffer); + buffer.release(); + return arrowRecordBatch; + } + + public void close() throws IOException { + in.close(); + } + +}