ARROW-264: File format

This is work in progress

Author: Julien Le Dem <jul...@dremio.com>

Closes #123 from julienledem/arrow_264_file_format and squashes the following 
commits:

252de6d [Julien Le Dem] remove outdated comment
04d797f [Julien Le Dem] maps are not nullable yet
e8359b3 [Julien Le Dem] align on 8 byte boundaries; more tests
8b8b823 [Julien Le Dem] refactoring
31e95e6 [Julien Le Dem] fix list vector
b824938 [Julien Le Dem] fix types; add licenses; more tests; more complex
2fd3bc1 [Julien Le Dem] cleanup
50fe680 [Julien Le Dem] nested support
b0bf6bc [Julien Le Dem] cleanup
4247b1a [Julien Le Dem] fix whitespace
d6a1788 [Julien Le Dem] refactoring
81863c5 [Julien Le Dem] fixed loader
aa1b766 [Julien Le Dem] better test
2067e01 [Julien Le Dem] update format
aacf61e [Julien Le Dem] fix pom
b907aa5 [Julien Le Dem] simplify
e43f26b [Julien Le Dem] add layout spec
0cc9718 [Julien Le Dem] add vector type
ac6902a [Julien Le Dem] ARROW-264: File format
807db51 [Julien Le Dem] move information to schema
f2f0596 [Julien Le Dem] Update FieldNode structure to be more explicit and 
reflect schema


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/803afeb5
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/803afeb5
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/803afeb5

Branch: refs/heads/master
Commit: 803afeb502dcdd802fada2ed0d66c145546b8a78
Parents: ec51d56
Author: Julien Le Dem <jul...@dremio.com>
Authored: Fri Aug 26 08:20:13 2016 -0700
Committer: Julien Le Dem <jul...@dremio.com>
Committed: Fri Aug 26 08:20:13 2016 -0700

----------------------------------------------------------------------
 cpp/src/arrow/ipc/metadata-internal.cc          |   1 +
 format/File.fbs                                 |  28 ++
 format/Message.fbs                              |  21 +-
 java/format/pom.xml                             |   1 +
 .../src/main/java/io/netty/buffer/ArrowBuf.java |  71 ++--
 .../vector/src/main/codegen/data/ArrowTypes.tdd |   4 +-
 .../src/main/codegen/templates/ArrowType.java   |  29 +-
 .../codegen/templates/NullableValueVectors.java |  49 ++-
 .../src/main/codegen/templates/UnionVector.java |  40 ++-
 .../arrow/vector/BaseDataValueVector.java       |  38 ++-
 .../org/apache/arrow/vector/BufferBacked.java   |  31 ++
 .../org/apache/arrow/vector/FieldVector.java    |  65 ++++
 .../org/apache/arrow/vector/ValueVector.java    |   6 +-
 .../org/apache/arrow/vector/VectorLoader.java   |  99 ++++++
 .../org/apache/arrow/vector/VectorUnloader.java |  78 +++++
 .../org/apache/arrow/vector/ZeroVector.java     |  39 ++-
 .../vector/complex/AbstractContainerVector.java |  21 +-
 .../arrow/vector/complex/AbstractMapVector.java |  42 ++-
 .../vector/complex/BaseRepeatedValueVector.java |  21 +-
 .../apache/arrow/vector/complex/ListVector.java |  58 +++-
 .../apache/arrow/vector/complex/MapVector.java  |  59 +++-
 .../vector/complex/impl/ComplexWriterImpl.java  |   2 +-
 .../vector/complex/impl/PromotableWriter.java   |   3 +-
 .../apache/arrow/vector/file/ArrowBlock.java    |  82 +++++
 .../apache/arrow/vector/file/ArrowFooter.java   | 144 ++++++++
 .../apache/arrow/vector/file/ArrowReader.java   | 151 +++++++++
 .../apache/arrow/vector/file/ArrowWriter.java   | 179 ++++++++++
 .../vector/file/InvalidArrowFileException.java  |  27 ++
 .../apache/arrow/vector/schema/ArrowBuffer.java |  81 +++++
 .../arrow/vector/schema/ArrowFieldNode.java     |  53 +++
 .../arrow/vector/schema/ArrowRecordBatch.java   | 127 +++++++
 .../arrow/vector/schema/ArrowVectorType.java    |  47 +++
 .../arrow/vector/schema/FBSerializable.java     |  24 ++
 .../arrow/vector/schema/FBSerializables.java    |  37 +++
 .../apache/arrow/vector/schema/TypeLayout.java  | 208 ++++++++++++
 .../arrow/vector/schema/VectorLayout.java       |  93 ++++++
 .../org/apache/arrow/vector/types/Types.java    |  70 ++--
 .../apache/arrow/vector/types/pojo/Field.java   |  42 ++-
 .../apache/arrow/vector/types/pojo/Schema.java  |  13 +-
 .../arrow/vector/TestVectorUnloadLoad.java      |  89 +++++
 .../ByteArrayReadableSeekableByteChannel.java   |  80 +++++
 .../apache/arrow/vector/file/TestArrowFile.java | 331 +++++++++++++++++++
 .../arrow/vector/file/TestArrowFooter.java      |  56 ++++
 .../vector/file/TestArrowReaderWriter.java      | 106 ++++++
 .../apache/arrow/vector/pojo/TestConvert.java   |  38 ++-
 45 files changed, 2722 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc 
b/cpp/src/arrow/ipc/metadata-internal.cc
index 50db730..c921e4d 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -219,6 +219,7 @@ static Status FieldToFlatbuffer(
   RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, 
&type_data));
   auto fb_children = fbb.CreateVector(children);
 
+  // TODO: produce the list of VectorTypes
   *offset = flatbuf::CreateField(
       fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary,
       fb_children);

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/File.fbs
----------------------------------------------------------------------
diff --git a/format/File.fbs b/format/File.fbs
new file mode 100644
index 0000000..f7ad1e1
--- /dev/null
+++ b/format/File.fbs
@@ -0,0 +1,28 @@
+include "Message.fbs";
+
+namespace org.apache.arrow.flatbuf;
+
+/// ----------------------------------------------------------------------
+/// Arrow File metadata
+///
+
+table Footer {
+
+  schema: org.apache.arrow.flatbuf.Schema;
+
+  dictionaries: [ Block ];
+
+  recordBatches: [ Block ];
+}
+
+struct Block {
+
+  offset: long;
+
+  metaDataLength: int;
+
+  bodyLength: long;
+
+}
+
+root_type Footer;

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/Message.fbs
----------------------------------------------------------------------
diff --git a/format/Message.fbs b/format/Message.fbs
index a78009b..b02f3fa 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -17,7 +17,7 @@ table Tuple {
 table List {
 }
 
-enum UnionMode:int { Sparse, Dense }
+enum UnionMode:short { Sparse, Dense }
 
 table Union {
   mode: UnionMode;
@@ -28,7 +28,7 @@ table Int {
   is_signed: bool;
 }
 
-enum Precision:int {SINGLE, DOUBLE}
+enum Precision:short {SINGLE, DOUBLE}
 
 table FloatingPoint {
   precision: Precision;
@@ -91,6 +91,17 @@ union Type {
   JSONScalar
 }
 
+enum VectorType: short {
+  /// used in List type Dense Union and variable length primitive types 
(String, Binary)
+  OFFSET,
+  /// fixed length primitive values
+  VALUES,
+  /// Bit vector indicated if each value is null
+  VALIDITY,
+  /// Type vector used in Union type
+  TYPE
+}
+
 /// ----------------------------------------------------------------------
 /// A field represents a named column in a record / row batch or child of a
 /// nested type.
@@ -109,12 +120,16 @@ table Field {
   dictionary: long;
   // children apply only to Nested data types like Struct, List and Union
   children: [Field];
+  /// the buffers produced for this type (as derived from the Type)
+  /// does not include children
+  /// each recordbatch will return instances of those Buffers.
+  buffers: [ VectorType ];
 }
 
 /// ----------------------------------------------------------------------
 /// Endianness of the platform that produces the RecordBatch
 
-enum Endianness:int { Little, Big }
+enum Endianness:short { Little, Big }
 
 /// ----------------------------------------------------------------------
 /// A Schema describes the columns in a row batch

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/format/pom.xml
----------------------------------------------------------------------
diff --git a/java/format/pom.xml b/java/format/pom.xml
index cb11b5f..dc58975 100644
--- a/java/format/pom.xml
+++ b/java/format/pom.xml
@@ -106,6 +106,7 @@
               <argument>-o</argument>
               <argument>target/generated-sources/</argument>
               <argument>../../format/Message.fbs</argument>
+              <argument>../../format/File.fbs</argument>
             </arguments>
           </configuration>
         </execution>

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java 
b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
index bbec26a..d10f002 100644
--- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -17,8 +17,6 @@
  */
 package io.netty.buffer;
 
-import io.netty.util.internal.PlatformDependent;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,16 +28,18 @@ import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.arrow.memory.AllocationManager.BufferLedger;
 import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BaseAllocator.Verbosity;
 import org.apache.arrow.memory.BoundsChecking;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.BufferManager;
-import org.apache.arrow.memory.AllocationManager.BufferLedger;
-import org.apache.arrow.memory.BaseAllocator.Verbosity;
 import org.apache.arrow.memory.util.HistoricalLog;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.util.internal.PlatformDependent;
+
 public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
 
@@ -307,7 +307,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf order(ByteOrder endianness) {
+  public ArrowBuf order(ByteOrder endianness) {
     return this;
   }
 
@@ -344,7 +344,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf slice() {
+  public ArrowBuf slice() {
     return slice(readerIndex(), readableBytes());
   }
 
@@ -467,7 +467,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf retain(int increment) {
+  public ArrowBuf retain(int increment) {
     Preconditions.checkArgument(increment > 0, "retain(%d) argument is not 
positive", increment);
 
     if (isEmpty) {
@@ -484,7 +484,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf retain() {
+  public ArrowBuf retain() {
     return retain(1);
   }
 
@@ -535,49 +535,49 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf setShort(int index, int value) {
+  public ArrowBuf setShort(int index, int value) {
     chk(index, 2);
     PlatformDependent.putShort(addr(index), (short) value);
     return this;
   }
 
   @Override
-  public ByteBuf setInt(int index, int value) {
+  public ArrowBuf setInt(int index, int value) {
     chk(index, 4);
     PlatformDependent.putInt(addr(index), value);
     return this;
   }
 
   @Override
-  public ByteBuf setLong(int index, long value) {
+  public ArrowBuf setLong(int index, long value) {
     chk(index, 8);
     PlatformDependent.putLong(addr(index), value);
     return this;
   }
 
   @Override
-  public ByteBuf setChar(int index, int value) {
+  public ArrowBuf setChar(int index, int value) {
     chk(index, 2);
     PlatformDependent.putShort(addr(index), (short) value);
     return this;
   }
 
   @Override
-  public ByteBuf setFloat(int index, float value) {
+  public ArrowBuf setFloat(int index, float value) {
     chk(index, 4);
     PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
     return this;
   }
 
   @Override
-  public ByteBuf setDouble(int index, double value) {
+  public ArrowBuf setDouble(int index, double value) {
     chk(index, 8);
     PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
     return this;
   }
 
   @Override
-  public ByteBuf writeShort(int value) {
+  public ArrowBuf writeShort(int value) {
     ensure(2);
     PlatformDependent.putShort(addr(writerIndex), (short) value);
     writerIndex += 2;
@@ -585,7 +585,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf writeInt(int value) {
+  public ArrowBuf writeInt(int value) {
     ensure(4);
     PlatformDependent.putInt(addr(writerIndex), value);
     writerIndex += 4;
@@ -593,7 +593,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf writeLong(long value) {
+  public ArrowBuf writeLong(long value) {
     ensure(8);
     PlatformDependent.putLong(addr(writerIndex), value);
     writerIndex += 8;
@@ -601,7 +601,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf writeChar(int value) {
+  public ArrowBuf writeChar(int value) {
     ensure(2);
     PlatformDependent.putShort(addr(writerIndex), (short) value);
     writerIndex += 2;
@@ -609,7 +609,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf writeFloat(float value) {
+  public ArrowBuf writeFloat(float value) {
     ensure(4);
     PlatformDependent.putInt(addr(writerIndex), 
Float.floatToRawIntBits(value));
     writerIndex += 4;
@@ -617,7 +617,7 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf writeDouble(double value) {
+  public ArrowBuf writeDouble(double value) {
     ensure(8);
     PlatformDependent.putLong(addr(writerIndex), 
Double.doubleToRawLongBits(value));
     writerIndex += 8;
@@ -625,19 +625,19 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+  public ArrowBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
     udle.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
-  public ByteBuf getBytes(int index, ByteBuffer dst) {
+  public ArrowBuf getBytes(int index, ByteBuffer dst) {
     udle.getBytes(index + offset, dst);
     return this;
   }
 
   @Override
-  public ByteBuf setByte(int index, int value) {
+  public ArrowBuf setByte(int index, int value) {
     chk(index, 1);
     PlatformDependent.putByte(addr(index), (byte) value);
     return this;
@@ -699,13 +699,13 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+  public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
     udle.getBytes(index + offset, dst, dstIndex, length);
     return this;
   }
 
   @Override
-  public ByteBuf getBytes(int index, OutputStream out, int length) throws 
IOException {
+  public ArrowBuf getBytes(int index, OutputStream out, int length) throws 
IOException {
     udle.getBytes(index + offset, out, length);
     return this;
   }
@@ -724,12 +724,12 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+  public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
     udle.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
-  public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) 
{
+  public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int 
length) {
     if (src.isDirect()) {
       checkIndex(index, length);
       PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) 
+ srcIndex, this.memoryAddress() + index,
@@ -749,13 +749,13 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
   }
 
   @Override
-  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+  public ArrowBuf setBytes(int index, byte[] src, int srcIndex, int length) {
     udle.setBytes(index + offset, src, srcIndex, length);
     return this;
   }
 
   @Override
-  public ByteBuf setBytes(int index, ByteBuffer src) {
+  public ArrowBuf setBytes(int index, ByteBuffer src) {
     udle.setBytes(index + offset, src);
     return this;
   }
@@ -860,4 +860,17 @@ public final class ArrowBuf extends AbstractByteBuf 
implements AutoCloseable {
     }
   }
 
+  @Override
+  public ArrowBuf readerIndex(int readerIndex) {
+    super.readerIndex(readerIndex);
+    return this;
+  }
+
+  @Override
+  public ArrowBuf writerIndex(int writerIndex) {
+    super.writerIndex(writerIndex);
+    return this;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/data/ArrowTypes.tdd
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd 
b/java/vector/src/main/codegen/data/ArrowTypes.tdd
index 4ab7f85..2ecad3d 100644
--- a/java/vector/src/main/codegen/data/ArrowTypes.tdd
+++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd
@@ -30,7 +30,7 @@
     },
     {
       name: "Union",
-      fields: []
+      fields: [{name: "mode", type: short}]
     },
     {
       name: "Int",
@@ -38,7 +38,7 @@
     },
     {
       name: "FloatingPoint",
-      fields: [{name: precision, type: int}]
+      fields: [{name: precision, type: short}]
     },
     {
       name: "Utf8",

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/ArrowType.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/ArrowType.java 
b/java/vector/src/main/codegen/templates/ArrowType.java
index 6dfaf21..29dee20 100644
--- a/java/vector/src/main/codegen/templates/ArrowType.java
+++ b/java/vector/src/main/codegen/templates/ArrowType.java
@@ -24,9 +24,8 @@ import java.util.Objects;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile 
name="/org/apache/arrow/vector/types/pojo/ArrowType.java" />
-
-
 <#include "/@includes/license.ftl" />
+
 package org.apache.arrow.vector.types.pojo;
 
 import com.google.flatbuffers.FlatBufferBuilder;
@@ -38,7 +37,13 @@ public abstract class ArrowType {
 
   public abstract byte getTypeType();
   public abstract int getType(FlatBufferBuilder builder);
+  public abstract <T> T accept(ArrowTypeVisitor<T> visitor);
 
+  public static interface ArrowTypeVisitor<T> {
+  <#list arrowTypes.types as type>
+    T visit(${type.name} type);
+  </#list>
+  }
 
   <#list arrowTypes.types as type>
   <#assign name = type.name>
@@ -70,9 +75,14 @@ public abstract class ArrowType {
 
     @Override
     public int getType(FlatBufferBuilder builder) {
+      <#list type.fields as field>
+      <#if field.type == "String">
+      int ${field.name} = builder.createString(this.${field.name});
+      </#if>
+      </#list>
       org.apache.arrow.flatbuf.${type.name}.start${type.name}(builder);
       <#list type.fields as field>
-      
org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, <#if 
field.type == 
"String">builder.createString(${field.name})<#else>${field.name}</#if>);
+      
org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, 
${field.name});
       </#list>
       return org.apache.arrow.flatbuf.${type.name}.end${type.name}(builder);
     }
@@ -83,6 +93,14 @@ public abstract class ArrowType {
     }
     </#list>
 
+    public String toString() {
+      return "${name}{"
+      <#list fields as field>
+      + ", " + ${field.name}
+      </#list>
+      + "}";
+    }
+
     @Override
     public int hashCode() {
       return Objects.hash(<#list type.fields as field>${field.name}<#if 
field_has_next>, </#if></#list>);
@@ -102,6 +120,11 @@ public abstract class ArrowType {
       </#list>
       </#if>
     }
+
+    @Override
+    public <T> T accept(ArrowTypeVisitor<T> visitor) {
+      return visitor.visit(this);
+    }
   }
   </#list>
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java 
b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index df50897..6b1aa04 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -29,6 +29,9 @@
 
 package org.apache.arrow.vector;
 
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import java.util.Collections;
+
 <#include "/@includes/vv_imports.ftl" />
 
 /**
@@ -39,7 +42,7 @@ package org.apache.arrow.vector;
  * NB: this class is automatically generated from ${.template_name} and 
ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${className} extends BaseDataValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, 
NullableVector{
+public final class ${className} extends BaseDataValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, 
NullableVector, FieldVector {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(${className}.class);
 
   private final FieldReader reader = new 
${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
@@ -54,6 +57,8 @@ public final class ${className} extends BaseDataValueVector 
implements <#if type
   private final Mutator mutator;
   private final Accessor accessor;
 
+  private final List<BufferBacked> innerVectors;
+
   <#if minor.class == "Decimal">
   private final int precision;
   private final int scale;
@@ -66,6 +71,10 @@ public final class ${className} extends BaseDataValueVector 
implements <#if type
     mutator = new Mutator();
     accessor = new Accessor();
     field = new Field(name, true, new Decimal(precision, scale), null);
+    innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
+        bits,
+        values
+    ));
   }
   <#else>
   public ${className}(String name, BufferAllocator allocator) {
@@ -88,9 +97,9 @@ public final class ${className} extends BaseDataValueVector 
implements <#if type
   <#elseif minor.class == "Time">
     field = new Field(name, true, new 
org.apache.arrow.vector.types.pojo.ArrowType.Time(), null);
   <#elseif minor.class == "Float4">
-    field = new Field(name, true, new FloatingPoint(0), null);
+    field = new Field(name, true, new 
FloatingPoint(org.apache.arrow.flatbuf.Precision.SINGLE), null);
   <#elseif minor.class == "Float8">
-    field = new Field(name, true, new FloatingPoint(1), null);
+    field = new Field(name, true, new 
FloatingPoint(org.apache.arrow.flatbuf.Precision.DOUBLE), null);
   <#elseif minor.class == "TimeStamp">
     field = new Field(name, true, new 
org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(""), null);
   <#elseif minor.class == "IntervalDay">
@@ -104,10 +113,44 @@ public final class ${className} extends 
BaseDataValueVector implements <#if type
   <#elseif minor.class == "Bit">
     field = new Field(name, true, new Bool(), null);
   </#if>
+    innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
+        bits,
+        <#if type.major = "VarLen">
+        values.offsetVector,
+        </#if>
+        values
+    ));
   }
   </#if>
 
   @Override
+  public List<BufferBacked> getFieldInnerVectors() {
+    return innerVectors;
+  }
+
+  @Override
+  public void initializeChildrenFromFields(List<Field> children) {
+    if (!children.isEmpty()) {
+      throw new IllegalArgumentException("primitive type vector ${className} 
can not have children: " + children);
+    }
+  }
+
+  @Override
+  public List<FieldVector> getChildrenFromFields() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), 
ownBuffers);
+    // TODO: do something with the sizes in fieldNode?
+  }
+
+  public List<ArrowBuf> getFieldBuffers() {
+    return 
org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+  }
+
+  @Override
   public Field getField() {
     return field;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java 
b/java/vector/src/main/codegen/templates/UnionVector.java
index 1fef490..72125fa 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -42,6 +42,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
 import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+
+import static org.apache.arrow.flatbuf.UnionMode.Sparse;
+
 
 /*
  * This class is generated using freemarker and the ${.template_name} template.
@@ -57,7 +61,7 @@ import org.apache.arrow.vector.util.CallBack;
  * For performance reasons, UnionVector stores a cached reference to each 
subtype vector, to avoid having to do the map lookup
  * each time the vector is accessed.
  */
-public class UnionVector implements ValueVector {
+public class UnionVector implements FieldVector {
 
   private String name;
   private BufferAllocator allocator;
@@ -95,6 +99,34 @@ public class UnionVector implements ValueVector {
     return MinorType.UNION;
   }
 
+  @Override
+  public void initializeChildrenFromFields(List<Field> children) {
+    getMap().initializeChildrenFromFields(children);
+  }
+
+  @Override
+  public List<FieldVector> getChildrenFromFields() {
+    return getMap().getChildrenFromFields();
+  }
+
+  @Override
+  public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ArrowBuf> getFieldBuffers() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<BufferBacked> getFieldInnerVectors() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+  
   public MapVector getMap() {
     if (mapVector == null) {
       int vectorCount = internalMap.size();
@@ -203,7 +235,7 @@ public class UnionVector implements ValueVector {
     for (ValueVector v : internalMap.getChildren()) {
       childFields.add(v.getField());
     }
-    return new Field(name, true, new ArrowType.Union(), childFields);
+    return new Field(name, true, new ArrowType.Union(Sparse), childFields);
   }
 
   @Override
@@ -237,10 +269,10 @@ public class UnionVector implements ValueVector {
     copyFrom(inIndex, outIndex, from);
   }
 
-  public ValueVector addVector(ValueVector v) {
+  public FieldVector addVector(FieldVector v) {
     String name = v.getMinorType().name().toLowerCase();
     Preconditions.checkState(internalMap.getChild(name) == null, 
String.format("%s vector already exists", name));
-    final ValueVector newVector = internalMap.addOrGet(name, v.getMinorType(), 
v.getClass());
+    final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), 
v.getClass());
     v.makeTransferPair(newVector).transfer();
     internalMap.putChild(name, newVector);
     if (callBack != null) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
index 05b7cf1..c22258d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
@@ -17,15 +17,38 @@
  */
 package org.apache.arrow.vector;
 
-import io.netty.buffer.ArrowBuf;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 
+import io.netty.buffer.ArrowBuf;
+
 
-public abstract class BaseDataValueVector extends BaseValueVector {
+public abstract class BaseDataValueVector extends BaseValueVector implements 
BufferBacked {
 
   protected final static byte[] emptyByteArray = new byte[]{}; // Nullable 
vectors use this
 
+  public static void load(List<BufferBacked> vectors, List<ArrowBuf> buffers) {
+    int expectedSize = vectors.size();
+    if (buffers.size() != expectedSize) {
+      throw new IllegalArgumentException("Illegal buffer count, expected " + 
expectedSize + ", got: " + buffers.size());
+    }
+    for (int i = 0; i < expectedSize; i++) {
+      vectors.get(i).load(buffers.get(i));
+    }
+  }
+
+  public static List<ArrowBuf> unload(List<BufferBacked> vectors) {
+    List<ArrowBuf> result = new ArrayList<>(vectors.size());
+    for (BufferBacked vector : vectors) {
+      result.add(vector.unLoad());
+    }
+    return result;
+  }
+
+  // TODO: Nullable vectors extend BaseDataValueVector but do not use the data 
field
+  // We should fix the inheritance tree
   protected ArrowBuf data;
 
   public BaseDataValueVector(String name, BufferAllocator allocator) {
@@ -82,6 +105,17 @@ public abstract class BaseDataValueVector extends 
BaseValueVector {
     return data;
   }
 
+  @Override
+  public void load(ArrowBuf data) {
+    this.data.release();
+    this.data = data.retain(allocator);
+  }
+
+  @Override
+  public ArrowBuf unLoad() {
+    return this.data.readerIndex(0);
+  }
+
   /**
    * This method has a similar effect of allocateNew() without actually 
clearing and reallocating
    * the value vector. The purpose is to move the value vector to a "mutate" 
state

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java 
b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
new file mode 100644
index 0000000..d1c262d
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Content is backed by a buffer and can be loaded/unloaded
+ */
+public interface BufferBacked {
+
+  void load(ArrowBuf data);
+
+  ArrowBuf unLoad();
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
new file mode 100644
index 0000000..b28433c
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.util.List;
+
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * A vector corresponding to a Field in the schema
+ * It has inner vectors backed by buffers (validity, offsets, data, ...)
+ */
+public interface FieldVector extends ValueVector {
+
+  /**
+   * Initializes the child vectors
+   * to be later loaded with loadBuffers
+   * @param children the schema
+   */
+  void initializeChildrenFromFields(List<Field> children);
+
+  /**
+   * the returned list is the same size as the list passed to 
initializeChildrenFromFields
+   * @return the children according to schema (empty for primitive types)
+   */
+  List<FieldVector> getChildrenFromFields();
+
+  /**
+   * loads data in the vectors
+   * (ownBuffers must be the same size as getFieldVectors())
+   * @param fieldNode the fieldNode
+   * @param ownBuffers the buffers for this Field (own buffers only, children 
not included)
+   */
+  void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers);
+
+  /**
+   * (same size as getFieldVectors() since it is their content)
+   * @return the buffers containing the data for this vector (ready for 
reading)
+   */
+  List<ArrowBuf> getFieldBuffers();
+
+  /**
+   * @return the inner vectors for this field as defined by the TypeLayout
+   */
+  List<BufferBacked> getFieldInnerVectors();
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
index 35321c9..ba7790e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
@@ -19,14 +19,14 @@ package org.apache.arrow.vector;
 
 import java.io.Closeable;
 
-import io.netty.buffer.ArrowBuf;
-
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.util.TransferPair;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.TransferPair;
+
+import io.netty.buffer.ArrowBuf;
 
 /**
  * An abstraction that is used to store a sequence of values in an individual 
column.

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
new file mode 100644
index 0000000..58ac68b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.common.collect.Iterators;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Loads buffers into vectors
+ */
+public class VectorLoader {
+  private final List<FieldVector> fieldVectors;
+  private final List<Field> fields;
+
+  /**
+   * will create children in root based on schema
+   * @param schema the expected schema
+   * @param root the root to add vectors to based on schema
+   */
+  public VectorLoader(Schema schema, FieldVector root) {
+    super();
+    this.fields = schema.getFields();
+    root.initializeChildrenFromFields(fields);
+    this.fieldVectors = root.getChildrenFromFields();
+    if (this.fieldVectors.size() != fields.size()) {
+      throw new IllegalArgumentException("The root vector did not create the 
right number of children. found " + fieldVectors.size() + " expected " + 
fields.size());
+    }
+  }
+
+  /**
+   * Loads the record batch in the vectors
+   * will not close the record batch
+   * @param recordBatch
+   */
+  public void load(ArrowRecordBatch recordBatch) {
+    Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
+    Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
+    for (int i = 0; i < fields.size(); ++i) {
+      Field field = fields.get(i);
+      FieldVector fieldVector = fieldVectors.get(i);
+      loadBuffers(fieldVector, field, buffers, nodes);
+    }
+    if (nodes.hasNext() || buffers.hasNext()) {
+      throw new IllegalArgumentException("not all nodes and buffers where 
consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + 
Iterators.toString(buffers));
+    }
+  }
+
+  private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> 
buffers, Iterator<ArrowFieldNode> nodes) {
+    ArrowFieldNode fieldNode = nodes.next();
+    List<VectorLayout> typeLayout = field.getTypeLayout().getVectors();
+    List<ArrowBuf> ownBuffers = new ArrayList<>(typeLayout.size());
+    for (int j = 0; j < typeLayout.size(); j++) {
+      ownBuffers.add(buffers.next());
+    }
+    try {
+      vector.loadFieldBuffers(fieldNode, ownBuffers);
+    } catch (RuntimeException e) {
+      throw new IllegalArgumentException("Could not load buffers for field " + 
field);
+    }
+    List<Field> children = field.getChildren();
+    if (children.size() > 0) {
+      List<FieldVector> childrenFromFields = vector.getChildrenFromFields();
+      checkArgument(children.size() == childrenFromFields.size(), "should have 
as many children as in the schema: found " + childrenFromFields.size() + " 
expected " + children.size());
+      for (int i = 0; i < childrenFromFields.size(); i++) {
+        Field child = children.get(i);
+        FieldVector fieldVector = childrenFromFields.get(i);
+        loadBuffers(fieldVector, child, buffers, nodes);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
new file mode 100644
index 0000000..e4d37bf
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.vector.ValueVector.Accessor;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import io.netty.buffer.ArrowBuf;
+
+public class VectorUnloader {
+
+  private final Schema schema;
+  private final int valueCount;
+  private final List<FieldVector> vectors;
+
+  public VectorUnloader(FieldVector parent) {
+    super();
+    this.schema = new Schema(parent.getField().getChildren());
+    this.valueCount = parent.getAccessor().getValueCount();
+    this.vectors = parent.getChildrenFromFields();
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public ArrowRecordBatch getRecordBatch() {
+    List<ArrowFieldNode> nodes = new ArrayList<>();
+    List<ArrowBuf> buffers = new ArrayList<>();
+    for (FieldVector vector : vectors) {
+      appendNodes(vector, nodes, buffers);
+    }
+    return new ArrowRecordBatch(valueCount, nodes, buffers);
+  }
+
+  private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, 
List<ArrowBuf> buffers) {
+    Accessor accessor = vector.getAccessor();
+    int nullCount = 0;
+    // TODO: should not have to do that
+    // we can do that a lot more efficiently (for example with 
Long.bitCount(i))
+    for (int i = 0; i < accessor.getValueCount(); i++) {
+      if (accessor.isNull(i)) {
+        nullCount ++;
+      }
+    }
+    nodes.add(new ArrowFieldNode(accessor.getValueCount(), nullCount));
+    List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
+    List<ArrowVectorType> expectedBuffers = 
vector.getField().getTypeLayout().getVectorTypes();
+    if (fieldBuffers.size() != expectedBuffers.size()) {
+      throw new IllegalArgumentException("wrong number of buffers for field " 
+ vector.getField() + ". found: " + fieldBuffers);
+    }
+    buffers.addAll(fieldBuffers);
+    for (FieldVector child : vector.getChildrenFromFields()) {
+      appendNodes(child, nodes, buffers);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
index 705a24b..c2482ad 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
@@ -17,25 +17,23 @@
  */
 package org.apache.arrow.vector;
 
-import com.google.flatbuffers.FlatBufferBuilder;
-import io.netty.buffer.ArrowBuf;
-
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
-import org.apache.arrow.flatbuf.Type;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.complex.impl.NullReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Null;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.TransferPair;
 
-import com.google.common.collect.Iterators;
+import io.netty.buffer.ArrowBuf;
 
-public class ZeroVector implements ValueVector {
+public class ZeroVector implements FieldVector {
   public final static ZeroVector INSTANCE = new ZeroVector();
 
   private final String name = "[DEFAULT]";
@@ -175,4 +173,33 @@ public class ZeroVector implements ValueVector {
   public FieldReader getReader() {
     return NullReader.INSTANCE;
   }
+
+  @Override
+  public void initializeChildrenFromFields(List<Field> children) {
+    if (!children.isEmpty()) {
+      throw new IllegalArgumentException("Zero vector has no children");
+    }
+  }
+
+  @Override
+  public List<FieldVector> getChildrenFromFields() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    if (!ownBuffers.isEmpty()) {
+      throw new IllegalArgumentException("Zero vector has no buffers");
+    }
+  }
+
+  @Override
+  public List<ArrowBuf> getFieldBuffers() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<BufferBacked> getFieldInnerVectors() {
+    return Collections.emptyList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index ed77975..2f68886 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -17,22 +17,13 @@
  */
 package org.apache.arrow.vector.complex;
 
-import java.util.Collection;
-
-import javax.annotation.Nullable;
-
-import org.apache.arrow.flatbuf.Field;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.util.CallBack;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
 /**
  * Base class for composite vectors.
  *
@@ -65,8 +56,8 @@ public abstract class AbstractContainerVector implements 
ValueVector {
   /**
    * Returns a {@link org.apache.arrow.vector.ValueVector} corresponding to 
the given field name if exists or null.
    */
-  public ValueVector getChild(String name) {
-    return getChild(name, ValueVector.class);
+  public FieldVector getChild(String name) {
+    return getChild(name, FieldVector.class);
   }
 
   /**
@@ -81,7 +72,7 @@ public abstract class AbstractContainerVector implements 
ValueVector {
 
   protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz) {
     if (clazz.isAssignableFrom(v.getClass())) {
-      return (T) v;
+      return clazz.cast(v);
     }
     throw new IllegalStateException(String.format("Vector requested [%s] was 
different than type stored [%s]. Arrow doesn't yet support hetergenous types.", 
clazz.getSimpleName(), v.getClass().getSimpleName()));
   }
@@ -94,10 +85,10 @@ public abstract class AbstractContainerVector implements 
ValueVector {
   public abstract int size();
 
   // add a new vector with the input MajorType or return the existing vector 
if we already added one with the same type
-  public abstract <T extends ValueVector> T addOrGet(String name, MinorType 
minorType, Class<T> clazz, int... precisionScale);
+  public abstract <T extends FieldVector> T addOrGet(String name, MinorType 
minorType, Class<T> clazz, int... precisionScale);
 
   // return the child vector with the input name
-  public abstract <T extends ValueVector> T getChild(String name, Class<T> 
clazz);
+  public abstract <T extends FieldVector> T getChild(String name, Class<T> 
clazz);
 
   // return the child vector's ordinal in the composite container
   public abstract VectorWithOrdinal getChildVectorWithOrdinal(String name);

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
index 5964f80..23b4997 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
@@ -17,23 +17,24 @@
  */
 package org.apache.arrow.vector.complex;
 
-import com.google.common.collect.ImmutableList;
-import io.netty.buffer.ArrowBuf;
-
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.arrow.flatbuf.Field;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.MapWithOrdinal;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import io.netty.buffer.ArrowBuf;
+
 /*
  * Base class for MapVectors. Currently used by RepeatedMapVector and MapVector
  */
@@ -41,7 +42,7 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
 
   // Maintains a map with key as field name and value is the vector itself
-  private final MapWithOrdinal<String, ValueVector> vectors =  new 
MapWithOrdinal<>();
+  private final MapWithOrdinal<String, FieldVector> vectors =  new 
MapWithOrdinal<>();
 
   protected AbstractMapVector(String name, BufferAllocator allocator, CallBack 
callBack) {
     super(name, allocator, callBack);
@@ -109,19 +110,19 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
    * @return resultant {@link org.apache.arrow.vector.ValueVector}
    */
   @Override
-  public <T extends ValueVector> T addOrGet(String name, MinorType minorType, 
Class<T> clazz, int... precisionScale) {
+  public <T extends FieldVector> T addOrGet(String name, MinorType minorType, 
Class<T> clazz, int... precisionScale) {
     final ValueVector existing = getChild(name);
     boolean create = false;
     if (existing == null) {
       create = true;
     } else if (clazz.isAssignableFrom(existing.getClass())) {
-      return (T) existing;
+      return clazz.cast(existing);
     } else if (nullFilled(existing)) {
       existing.clear();
       create = true;
     }
     if (create) {
-      final T vector = (T) minorType.getNewVector(name, allocator, callBack, 
precisionScale);
+      final T vector = clazz.cast(minorType.getNewVector(name, allocator, 
callBack, precisionScale));
       putChild(name, vector);
       if (callBack!=null) {
         callBack.doWork();
@@ -153,7 +154,7 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
    * field name if exists or null.
    */
   @Override
-  public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
+  public <T extends FieldVector> T getChild(String name, Class<T> clazz) {
     final ValueVector v = vectors.get(name.toLowerCase());
     if (v == null) {
       return null;
@@ -161,12 +162,25 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
     return typeify(v, clazz);
   }
 
+  protected ValueVector add(String name, MinorType minorType, int... 
precisionScale) {
+    final ValueVector existing = getChild(name);
+    if (existing != null) {
+      throw new IllegalStateException(String.format("Vector already exists: 
Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType));
+    }
+    FieldVector vector = minorType.getNewVector(name, allocator, callBack, 
precisionScale);
+    putChild(name, vector);
+    if (callBack!=null) {
+      callBack.doWork();
+    }
+    return vector;
+  }
+
   /**
    * Inserts the vector with the given name if it does not exist else replaces 
it with the new value.
    *
    * Note that this method does not enforce any vector type check nor throws a 
schema change exception.
    */
-  protected void putChild(String name, ValueVector vector) {
+  protected void putChild(String name, FieldVector vector) {
     putVector(name, vector);
   }
 
@@ -175,7 +189,7 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
    * @param name  field name
    * @param vector  vector to be inserted
    */
-  protected void putVector(String name, ValueVector vector) {
+  protected void putVector(String name, FieldVector vector) {
     final ValueVector old = vectors.put(
         Preconditions.checkNotNull(name, "field name cannot be 
null").toLowerCase(),
         Preconditions.checkNotNull(vector, "vector cannot be null")
@@ -189,9 +203,9 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
   /**
    * Returns a sequence of underlying child vectors.
    */
-  protected List<ValueVector> getChildren() {
+  protected List<FieldVector> getChildren() {
     int size = vectors.size();
-    List<ValueVector> children = new ArrayList<>();
+    List<FieldVector> children = new ArrayList<>();
     for (int i = 0; i < size; i++) {
       children.add(vectors.getByOrdinal(i));
     }
@@ -216,7 +230,7 @@ public abstract class AbstractMapVector extends 
AbstractContainerVector {
 
   @Override
   public Iterator<ValueVector> iterator() {
-    return vectors.values().iterator();
+    return 
Collections.<ValueVector>unmodifiableCollection(vectors.values()).iterator();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 4226274..517d20c 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -17,8 +17,6 @@
  */
 package org.apache.arrow.vector.complex;
 
-import io.netty.buffer.ArrowBuf;
-
 import java.util.Collections;
 import java.util.Iterator;
 
@@ -26,29 +24,32 @@ import org.apache.arrow.flatbuf.Type;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.AddOrGetResult;
 import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ObjectArrays;
-import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
+
+import io.netty.buffer.ArrowBuf;
 
 public abstract class BaseRepeatedValueVector extends BaseValueVector 
implements RepeatedValueVector {
 
-  public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
+  public final static FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
   public final static String OFFSETS_VECTOR_NAME = "$offsets$";
   public final static String DATA_VECTOR_NAME = "$data$";
 
   protected final UInt4Vector offsets;
-  protected ValueVector vector;
+  protected FieldVector vector;
 
   protected BaseRepeatedValueVector(String name, BufferAllocator allocator) {
     this(name, allocator, DEFAULT_DATA_VECTOR);
   }
 
-  protected BaseRepeatedValueVector(String name, BufferAllocator allocator, 
ValueVector vector) {
+  protected BaseRepeatedValueVector(String name, BufferAllocator allocator, 
FieldVector vector) {
     super(name, allocator);
     this.offsets = new UInt4Vector(OFFSETS_VECTOR_NAME, allocator);
     this.vector = Preconditions.checkNotNull(vector, "data vector cannot be 
null");
@@ -83,7 +84,7 @@ public abstract class BaseRepeatedValueVector extends 
BaseValueVector implements
   }
 
   @Override
-  public ValueVector getDataVector() {
+  public FieldVector getDataVector() {
     return vector;
   }
 
@@ -121,7 +122,7 @@ public abstract class BaseRepeatedValueVector extends 
BaseValueVector implements
 
   @Override
   public Iterator<ValueVector> iterator() {
-    return Collections.singleton(getDataVector()).iterator();
+    return Collections.<ValueVector>singleton(getDataVector()).iterator();
   }
 
   @Override
@@ -167,7 +168,7 @@ public abstract class BaseRepeatedValueVector extends 
BaseValueVector implements
     return new AddOrGetResult<>((T)vector, created);
   }
 
-  protected void replaceDataVector(ValueVector v) {
+  protected void replaceDataVector(FieldVector v) {
     vector.clear();
     vector = v;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index c6c6b09..2984c36 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -18,15 +18,18 @@
  
******************************************************************************/
 package org.apache.arrow.vector.complex;
 
-import com.google.common.collect.ImmutableList;
-import com.google.flatbuffers.FlatBufferBuilder;
-import io.netty.buffer.ArrowBuf;
+import static java.util.Collections.singletonList;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.BaseDataValueVector;
+import org.apache.arrow.vector.BufferBacked;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.UInt1Vector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.ValueVector;
@@ -36,18 +39,24 @@ import org.apache.arrow.vector.complex.impl.UnionListReader;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.CallBack;
 import org.apache.arrow.vector.util.JsonStringArrayList;
 import org.apache.arrow.vector.util.TransferPair;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ObjectArrays;
 
-public class ListVector extends BaseRepeatedValueVector {
+import io.netty.buffer.ArrowBuf;
+
+public class ListVector extends BaseRepeatedValueVector implements FieldVector 
{
 
-  UInt4Vector offsets;
+  final UInt4Vector offsets;
   final UInt1Vector bits;
+  private final List<BufferBacked> innerVectors;
   private Mutator mutator = new Mutator();
   private Accessor accessor = new Accessor();
   private UnionListWriter writer;
@@ -57,12 +66,46 @@ public class ListVector extends BaseRepeatedValueVector {
   public ListVector(String name, BufferAllocator allocator, CallBack callBack) 
{
     super(name, allocator);
     this.bits = new UInt1Vector("$bits$", allocator);
-    offsets = getOffsetVector();
+    this.offsets = getOffsetVector();
+    this.innerVectors = 
Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets));
     this.writer = new UnionListWriter(this);
     this.reader = new UnionListReader(this);
     this.callBack = callBack;
   }
 
+  @Override
+  public void initializeChildrenFromFields(List<Field> children) {
+    if (children.size() != 1) {
+      throw new IllegalArgumentException("Lists have only one child. Found: " 
+ children);
+    }
+    Field field = children.get(0);
+    MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
+    AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType);
+    if (!addOrGetVector.isCreated()) {
+      throw new IllegalArgumentException("Child vector already existed: " + 
addOrGetVector.getVector());
+    }
+  }
+
+  @Override
+  public List<FieldVector> getChildrenFromFields() {
+    return singletonList(getDataVector());
+  }
+
+  @Override
+  public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
+  }
+
+  @Override
+  public List<ArrowBuf> getFieldBuffers() {
+    return BaseDataValueVector.unload(getFieldInnerVectors());
+  }
+
+  @Override
+  public List<BufferBacked> getFieldInnerVectors() {
+    return innerVectors;
+  }
+
   public UnionListWriter getWriter() {
     return writer;
   }
@@ -86,7 +129,7 @@ public class ListVector extends BaseRepeatedValueVector {
   }
 
   @Override
-  public ValueVector getDataVector() {
+  public FieldVector getDataVector() {
     return vector;
   }
 
@@ -298,4 +341,5 @@ public class ListVector extends BaseRepeatedValueVector {
       bits.getMutator().setValueCount(valueCount);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
index 0cb613e..e369658 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
@@ -17,10 +17,10 @@
  */
 package org.apache.arrow.vector.complex;
 
-import io.netty.buffer.ArrowBuf;
-
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -28,13 +28,17 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BaseDataValueVector;
 import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.BufferBacked;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.holders.ComplexHolder;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.CallBack;
@@ -45,7 +49,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Ordering;
 import com.google.common.primitives.Ints;
 
-public class MapVector extends AbstractMapVector {
+import io.netty.buffer.ArrowBuf;
+
+public class MapVector extends AbstractMapVector implements FieldVector {
   //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MapVector.class);
 
   private final SingleMapReaderImpl reader = new 
SingleMapReaderImpl(MapVector.this);
@@ -53,6 +59,9 @@ public class MapVector extends AbstractMapVector {
   private final Mutator mutator = new Mutator();
   int valueCount;
 
+  // TODO: validity vector
+  private final List<BufferBacked> innerVectors = 
Collections.unmodifiableList(Arrays.<BufferBacked>asList());
+
   public MapVector(String name, BufferAllocator allocator, CallBack callBack){
     super(name, allocator, callBack);
   }
@@ -120,7 +129,7 @@ public class MapVector extends AbstractMapVector {
     int expectedSize = getBufferSize();
     int actualSize   = super.getBufferSize();
 
-    Preconditions.checkArgument(expectedSize == actualSize);
+    Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " 
!= " + actualSize);
     return super.getBuffers(clear);
   }
 
@@ -159,7 +168,7 @@ public class MapVector extends AbstractMapVector {
       this.to.ephPair = null;
 
       int i = 0;
-      ValueVector vector;
+      FieldVector vector;
       for (String child:from.getChildFieldNames()) {
         int preSize = to.size();
         vector = from.getChild(child);
@@ -175,7 +184,7 @@ public class MapVector extends AbstractMapVector {
         // (This is similar to what happens in ScanBatch where the children 
cannot be added till they are
         // read). To take care of this, we ensure that the hashCode of the 
MaterializedField does not
         // include the hashCode of the children but is based only on 
MaterializedField$key.
-        final ValueVector newVector = to.addOrGet(child, 
vector.getMinorType(), vector.getClass());
+        final FieldVector newVector = to.addOrGet(child, 
vector.getMinorType(), vector.getClass());
         if (allocate && to.size() != preSize) {
           newVector.allocateNew();
         }
@@ -315,13 +324,45 @@ public class MapVector extends AbstractMapVector {
 
   @Override
   public void close() {
-    final Collection<ValueVector> vectors = getChildren();
-    for (final ValueVector v : vectors) {
+    final Collection<FieldVector> vectors = getChildren();
+    for (final FieldVector v : vectors) {
       v.close();
     }
     vectors.clear();
+
     valueCount = 0;
 
     super.close();
  }
+
+  @Override
+  public void initializeChildrenFromFields(List<Field> children) {
+    for (Field field : children) {
+      MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
+      FieldVector vector = (FieldVector)this.add(field.getName(), minorType);
+      vector.initializeChildrenFromFields(field.getChildren());
+    }
+  }
+
+  @Override
+  public List<FieldVector> getChildrenFromFields() {
+    return getChildren();
+  }
+
+  @Override
+  public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> 
ownBuffers) {
+    BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
+    // TODO: something with fieldNode?
+  }
+
+  @Override
+  public List<ArrowBuf> getFieldBuffers() {
+    return BaseDataValueVector.unload(getFieldInnerVectors());
+  }
+
+  @Override
+  public List<BufferBacked> getFieldInnerVectors() {
+    return innerVectors;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
index 4d2adfb..89bfefc 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
@@ -22,9 +22,9 @@ import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.StateTool;
 import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
 
 import com.google.common.base.Preconditions;
-import org.apache.arrow.vector.types.pojo.Field;
 
 public class ComplexWriterImpl extends AbstractFieldWriter implements 
ComplexWriter {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
index 586b128..c282688 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.arrow.vector.complex.impl;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
 import org.apache.arrow.vector.complex.AbstractMapVector;
@@ -129,7 +130,7 @@ public class PromotableWriter extends 
AbstractPromotableFieldWriter {
     } else if (listVector != null) {
       unionVector = listVector.promoteToUnion();
     }
-    unionVector.addVector(tp.getTo());
+    unionVector.addVector((FieldVector)tp.getTo());
     writer = new UnionWriter(unionVector);
     writer.setPosition(idx());
     for (int i = 0; i < idx(); i++) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
new file mode 100644
index 0000000..90fb02b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import org.apache.arrow.flatbuf.Block;
+import org.apache.arrow.vector.schema.FBSerializable;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowBlock implements FBSerializable {
+
+  private final long offset;
+  private final int metadataLength;
+  private final long bodyLength;
+
+  public ArrowBlock(long offset, int metadataLength, long bodyLength) {
+    super();
+    this.offset = offset;
+    this.metadataLength = metadataLength;
+    this.bodyLength = bodyLength;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public int getMetadataLength() {
+    return metadataLength;
+  }
+
+  public long getBodyLength() {
+    return bodyLength;
+  }
+
+  @Override
+  public int writeTo(FlatBufferBuilder builder) {
+    return Block.createBlock(builder, offset, metadataLength, bodyLength);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32));
+    result = prime * result + metadataLength;
+    result = prime * result + (int) (offset ^ (offset >>> 32));
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ArrowBlock other = (ArrowBlock) obj;
+    if (bodyLength != other.bodyLength)
+      return false;
+    if (metadataLength != other.metadataLength)
+      return false;
+    if (offset != other.offset)
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
new file mode 100644
index 0000000..01e175b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Block;
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.vector.schema.FBSerializable;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowFooter implements FBSerializable {
+
+  private final Schema schema;
+
+  private final List<ArrowBlock> dictionaries;
+
+  private final List<ArrowBlock> recordBatches;
+
+  public ArrowFooter(Schema schema, List<ArrowBlock> dictionaries, 
List<ArrowBlock> recordBatches) {
+    super();
+    this.schema = schema;
+    this.dictionaries = dictionaries;
+    this.recordBatches = recordBatches;
+  }
+
+  public ArrowFooter(Footer footer) {
+    this(
+        Schema.convertSchema(footer.schema()),
+        dictionaries(footer),
+        recordBatches(footer)
+        );
+  }
+
+  private static List<ArrowBlock> recordBatches(Footer footer) {
+    List<ArrowBlock> recordBatches = new ArrayList<>();
+    Block tempBLock = new Block();
+    int recordBatchesLength = footer.recordBatchesLength();
+    for (int i = 0; i < recordBatchesLength; i++) {
+      Block block = footer.recordBatches(tempBLock, i);
+      recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), 
block.bodyLength()));
+    }
+    return recordBatches;
+  }
+
+  private static List<ArrowBlock> dictionaries(Footer footer) {
+    List<ArrowBlock> dictionaries = new ArrayList<>();
+    Block tempBLock = new Block();
+    int dictionariesLength = footer.dictionariesLength();
+    for (int i = 0; i < dictionariesLength; i++) {
+      Block block = footer.dictionaries(tempBLock, i);
+      dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), 
block.bodyLength()));
+    }
+    return dictionaries;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public List<ArrowBlock> getDictionaries() {
+    return dictionaries;
+  }
+
+  public List<ArrowBlock> getRecordBatches() {
+    return recordBatches;
+  }
+
+  @Override
+  public int writeTo(FlatBufferBuilder builder) {
+    int schemaIndex = schema.getSchema(builder);
+    Footer.startDictionariesVector(builder, dictionaries.size());
+    int dicsOffset = endVector(builder, dictionaries);
+    Footer.startRecordBatchesVector(builder, recordBatches.size());
+    int rbsOffset = endVector(builder, recordBatches);
+    Footer.startFooter(builder);
+    Footer.addSchema(builder, schemaIndex);
+    Footer.addDictionaries(builder, dicsOffset);
+    Footer.addRecordBatches(builder, rbsOffset);
+    return Footer.endFooter(builder);
+  }
+
+  private int endVector(FlatBufferBuilder builder, List<ArrowBlock> blocks) {
+    for (ArrowBlock block : blocks) {
+      block.writeTo(builder);
+    }
+    return builder.endVector();
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((dictionaries == null) ? 0 : 
dictionaries.hashCode());
+    result = prime * result + ((recordBatches == null) ? 0 : 
recordBatches.hashCode());
+    result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ArrowFooter other = (ArrowFooter) obj;
+    if (dictionaries == null) {
+      if (other.dictionaries != null)
+        return false;
+    } else if (!dictionaries.equals(other.dictionaries))
+      return false;
+    if (recordBatches == null) {
+      if (other.recordBatches != null)
+        return false;
+    } else if (!recordBatches.equals(other.recordBatches))
+      return false;
+    if (schema == null) {
+      if (other.schema != null)
+        return false;
+    } else if (!schema.equals(other.schema))
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
new file mode 100644
index 0000000..bbcd3e9
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Buffer;
+import org.apache.arrow.flatbuf.FieldNode;
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.flatbuf.RecordBatch;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ArrowBuf;
+
+public class ArrowReader implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ArrowReader.class);
+
+  private static final byte[] MAGIC = "ARROW1".getBytes();
+
+  private final SeekableByteChannel in;
+
+  private final BufferAllocator allocator;
+
+  private ArrowFooter footer;
+
+  public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
+    super();
+    this.in = in;
+    this.allocator = allocator;
+  }
+
+  private int readFully(ArrowBuf buffer, int l) throws IOException {
+    int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
+    buffer.writerIndex(n);
+    if (n != l) {
+      throw new IllegalStateException(n + " != " + l);
+    }
+    return n;
+  }
+
+  private int readFully(ByteBuffer buffer) throws IOException {
+    int total = 0;
+    int n;
+    do {
+      n = in.read(buffer);
+      total += n;
+    } while (n >= 0 && buffer.remaining() > 0);
+    buffer.flip();
+    return total;
+  }
+
+  private static int bytesToInt(byte[] bytes) {
+    return ((int)(bytes[3] & 255) << 24) +
+           ((int)(bytes[2] & 255) << 16) +
+           ((int)(bytes[1] & 255) <<  8) +
+           ((int)(bytes[0] & 255) <<  0);
+  }
+
+  public ArrowFooter readFooter() throws IOException {
+    if (footer == null) {
+      if (in.size() <= (MAGIC.length * 2 + 4)) {
+        throw new InvalidArrowFileException("file too small: " + in.size());
+      }
+      ByteBuffer buffer = ByteBuffer.allocate(4 + MAGIC.length);
+      long footerLengthOffset = in.size() - buffer.remaining();
+      in.position(footerLengthOffset);
+      readFully(buffer);
+      byte[] array = buffer.array();
+      if (!Arrays.equals(MAGIC, Arrays.copyOfRange(array, 4, array.length))) {
+        throw new InvalidArrowFileException("missing Magic number " + 
Arrays.toString(buffer.array()));
+      }
+      int footerLength = bytesToInt(array);
+      if (footerLength <= 0 || footerLength + MAGIC.length * 2 + 4 > 
in.size()) {
+        throw new InvalidArrowFileException("invalid footer length: " + 
footerLength);
+      }
+      long footerOffset = footerLengthOffset - footerLength;
+      LOGGER.debug(String.format("Footer starts at %d, length: %d", 
footerOffset, footerLength));
+      ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength);
+      in.position(footerOffset);
+      readFully(footerBuffer);
+      Footer footerFB = Footer.getRootAsFooter(footerBuffer);
+      this.footer = new ArrowFooter(footerFB);
+    }
+    return footer;
+  }
+
+  // TODO: read dictionaries
+
+  public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws 
IOException {
+    LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", 
recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), 
recordBatchBlock.getBodyLength()));
+    int l = (int)(recordBatchBlock.getMetadataLength() + 
recordBatchBlock.getBodyLength());
+    if (l < 0) {
+      throw new InvalidArrowFileException("block invalid: " + 
recordBatchBlock);
+    }
+    final ArrowBuf buffer = allocator.buffer(l);
+    LOGGER.debug("allocated buffer " + buffer);
+    in.position(recordBatchBlock.getOffset());
+    int n = readFully(buffer, l);
+    if (n != l) {
+      throw new IllegalStateException(n + " != " + l);
+    }
+    RecordBatch recordBatchFB = 
RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());
+    int nodesLength = recordBatchFB.nodesLength();
+    final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), 
(int)recordBatchBlock.getBodyLength());
+    List<ArrowFieldNode> nodes = new ArrayList<>();
+    for (int i = 0; i < nodesLength; ++i) {
+      FieldNode node = recordBatchFB.nodes(i);
+      nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
+    }
+    List<ArrowBuf> buffers = new ArrayList<>();
+    for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
+      Buffer bufferFB = recordBatchFB.buffers(i);
+      LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", 
bufferFB.offset(), bufferFB.length()));
+      ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), 
(int)bufferFB.length());
+      buffers.add(vectorBuffer);
+    }
+    ArrowRecordBatch arrowRecordBatch = new 
ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
+    LOGGER.debug("released buffer " + buffer);
+    buffer.release();
+    return arrowRecordBatch;
+  }
+
+  public void close() throws IOException {
+    in.close();
+  }
+
+}

Reply via email to