[SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0

## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of 
Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 
0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutl...@gmail.com>
Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59d52631
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59d52631
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59d52631

Branch: refs/heads/master
Commit: 59d52631eb86394f1d981419cb744c20bd4e0b87
Parents: cb9fc8d
Author: Bryan Cutler <cutl...@gmail.com>
Authored: Thu Dec 21 20:43:56 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Thu Dec 21 20:43:56 2017 +0900

----------------------------------------------------------------------
 .../spark/network/crypto/TransportCipher.java   |  41 +-
 .../network/protocol/MessageWithHeader.java     |  39 +-
 .../spark/network/sasl/SaslEncryption.java      |  41 +-
 .../spark/network/util/AbstractFileRegion.java  |  53 ++
 .../org/apache/spark/network/ProtocolSuite.java |   4 +-
 .../protocol/MessageWithHeaderSuite.java        |   7 +-
 .../org/apache/spark/storage/DiskStore.scala    |   9 +-
 dev/deps/spark-deps-hadoop-2.6                  |  10 +-
 dev/deps/spark-deps-hadoop-2.7                  |  10 +-
 pom.xml                                         |  12 +-
 python/pyspark/serializers.py                   |  27 +-
 python/pyspark/sql/dataframe.py                 |   2 +
 python/pyspark/sql/functions.py                 |  13 +-
 python/pyspark/sql/group.py                     |   2 +-
 python/pyspark/sql/session.py                   |   3 +
 python/pyspark/sql/tests.py                     |  12 +-
 python/pyspark/sql/types.py                     |  25 +-
 python/pyspark/sql/udf.py                       |  16 +-
 python/pyspark/sql/utils.py                     |   9 +
 .../execution/vectorized/ArrowColumnVector.java | 136 +++--
 .../sql/execution/arrow/ArrowConverters.scala   |  13 +-
 .../spark/sql/execution/arrow/ArrowWriter.scala | 132 ++---
 .../execution/python/ArrowPythonRunner.scala    |  27 +-
 .../execution/arrow/ArrowConvertersSuite.scala  | 571 ++-----------------
 .../vectorized/ArrowColumnVectorSuite.scala     | 150 +++--
 .../vectorized/ColumnarBatchSuite.scala         |  20 +-
 26 files changed, 515 insertions(+), 869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
index 7376d1d..e04524d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
@@ -30,10 +30,10 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
-import io.netty.util.AbstractReferenceCounted;
 import org.apache.commons.crypto.stream.CryptoInputStream;
 import org.apache.commons.crypto.stream.CryptoOutputStream;
 
+import org.apache.spark.network.util.AbstractFileRegion;
 import org.apache.spark.network.util.ByteArrayReadableChannel;
 import org.apache.spark.network.util.ByteArrayWritableChannel;
 
@@ -161,7 +161,7 @@ public class TransportCipher {
     }
   }
 
-  private static class EncryptedMessage extends AbstractReferenceCounted 
implements FileRegion {
+  private static class EncryptedMessage extends AbstractFileRegion {
     private final boolean isByteBuf;
     private final ByteBuf buf;
     private final FileRegion region;
@@ -199,11 +199,46 @@ public class TransportCipher {
     }
 
     @Override
-    public long transfered() {
+    public long transferred() {
       return transferred;
     }
 
     @Override
+    public EncryptedMessage touch(Object o) {
+      super.touch(o);
+      if (region != null) {
+        region.touch(o);
+      }
+      if (buf != null) {
+        buf.touch(o);
+      }
+      return this;
+    }
+
+    @Override
+    public EncryptedMessage retain(int increment) {
+      super.retain(increment);
+      if (region != null) {
+        region.retain(increment);
+      }
+      if (buf != null) {
+        buf.retain(increment);
+      }
+      return this;
+    }
+
+    @Override
+    public boolean release(int decrement) {
+      if (region != null) {
+        region.release(decrement);
+      }
+      if (buf != null) {
+        buf.release(decrement);
+      }
+      return super.release(decrement);
+    }
+
+    @Override
     public long transferTo(WritableByteChannel target, long position) throws 
IOException {
       Preconditions.checkArgument(position == transfered(), "Invalid 
position.");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index 4f8781b..897d0f9 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -25,17 +25,17 @@ import javax.annotation.Nullable;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.ReferenceCountUtil;
 
 import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.AbstractFileRegion;
 
 /**
  * A wrapper message that holds two separate pieces (a header and a body).
  *
  * The header must be a ByteBuf, while the body can be a ByteBuf or a 
FileRegion.
  */
-class MessageWithHeader extends AbstractReferenceCounted implements FileRegion 
{
+class MessageWithHeader extends AbstractFileRegion {
 
   @Nullable private final ManagedBuffer managedBuffer;
   private final ByteBuf header;
@@ -91,7 +91,7 @@ class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
   }
 
   @Override
-  public long transfered() {
+  public long transferred() {
     return totalBytesTransferred;
   }
 
@@ -160,4 +160,37 @@ class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
 
     return ret;
   }
+
+  @Override
+  public MessageWithHeader touch(Object o) {
+    super.touch(o);
+    header.touch(o);
+    ReferenceCountUtil.touch(body, o);
+    return this;
+  }
+
+  @Override
+  public MessageWithHeader retain(int increment) {
+    super.retain(increment);
+    header.retain(increment);
+    ReferenceCountUtil.retain(body, increment);
+    if (managedBuffer != null) {
+      for (int i = 0; i < increment; i++) {
+        managedBuffer.retain();
+      }
+    }
+    return this;
+  }
+
+  @Override
+  public boolean release(int decrement) {
+    header.release(decrement);
+    ReferenceCountUtil.release(body, decrement);
+    if (managedBuffer != null) {
+      for (int i = 0; i < decrement; i++) {
+        managedBuffer.release();
+      }
+    }
+    return super.release(decrement);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
 
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
index 3d71eba..16ab4ef 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
@@ -32,8 +32,8 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.FileRegion;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.util.AbstractReferenceCounted;
 
+import org.apache.spark.network.util.AbstractFileRegion;
 import org.apache.spark.network.util.ByteArrayWritableChannel;
 import org.apache.spark.network.util.NettyUtils;
 
@@ -129,7 +129,7 @@ class SaslEncryption {
   }
 
   @VisibleForTesting
-  static class EncryptedMessage extends AbstractReferenceCounted implements 
FileRegion {
+  static class EncryptedMessage extends AbstractFileRegion {
 
     private final SaslEncryptionBackend backend;
     private final boolean isByteBuf;
@@ -183,10 +183,45 @@ class SaslEncryption {
      * Returns an approximation of the amount of data transferred. See {@link 
#count()}.
      */
     @Override
-    public long transfered() {
+    public long transferred() {
       return transferred;
     }
 
+    @Override
+    public EncryptedMessage touch(Object o) {
+      super.touch(o);
+      if (buf != null) {
+        buf.touch(o);
+      }
+      if (region != null) {
+        region.touch(o);
+      }
+      return this;
+    }
+
+    @Override
+    public EncryptedMessage retain(int increment) {
+      super.retain(increment);
+      if (buf != null) {
+        buf.retain(increment);
+      }
+      if (region != null) {
+        region.retain(increment);
+      }
+      return this;
+    }
+
+    @Override
+    public boolean release(int decrement) {
+      if (region != null) {
+        region.release(decrement);
+      }
+      if (buf != null) {
+        buf.release(decrement);
+      }
+      return super.release(decrement);
+    }
+
     /**
      * Transfers data from the original message to the channel, encrypting it 
in the process.
      *

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java
new file mode 100644
index 0000000..8651297
--- /dev/null
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/AbstractFileRegion.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+public abstract class AbstractFileRegion extends AbstractReferenceCounted 
implements FileRegion {
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public final long transfered() {
+    return transferred();
+  }
+
+  @Override
+  public AbstractFileRegion retain() {
+    super.retain();
+    return this;
+  }
+
+  @Override
+  public AbstractFileRegion retain(int increment) {
+    super.retain(increment);
+    return this;
+  }
+
+  @Override
+  public AbstractFileRegion touch() {
+    super.touch();
+    return this;
+  }
+
+  @Override
+  public AbstractFileRegion touch(Object o) {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java
index bb1c40c..bc94f7c 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ProtocolSuite.java
@@ -56,7 +56,7 @@ public class ProtocolSuite {
         NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
 
     while (!serverChannel.outboundMessages().isEmpty()) {
-      clientChannel.writeInbound(serverChannel.readOutbound());
+      clientChannel.writeOneInbound(serverChannel.readOutbound());
     }
 
     assertEquals(1, clientChannel.inboundMessages().size());
@@ -72,7 +72,7 @@ public class ProtocolSuite {
         NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);
 
     while (!clientChannel.outboundMessages().isEmpty()) {
-      serverChannel.writeInbound(clientChannel.readOutbound());
+      serverChannel.writeOneInbound(clientChannel.readOutbound());
     }
 
     assertEquals(1, serverChannel.inboundMessages().size());

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
index b341c56..ecb66fc 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
@@ -23,8 +23,7 @@ import java.nio.channels.WritableByteChannel;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
+import org.apache.spark.network.util.AbstractFileRegion;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -108,7 +107,7 @@ public class MessageWithHeaderSuite {
     return Unpooled.wrappedBuffer(channel.getData());
   }
 
-  private static class TestFileRegion extends AbstractReferenceCounted 
implements FileRegion {
+  private static class TestFileRegion extends AbstractFileRegion {
 
     private final int writeCount;
     private final int writesPerCall;
@@ -130,7 +129,7 @@ public class MessageWithHeaderSuite {
     }
 
     @Override
-    public long transfered() {
+    public long transferred() {
       return 8 * written;
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 97abd92..39249d4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -26,12 +26,11 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.mutable.ListBuffer
 
 import com.google.common.io.Closeables
-import io.netty.channel.{DefaultFileRegion, FileRegion}
-import io.netty.util.AbstractReferenceCounted
+import io.netty.channel.DefaultFileRegion
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
 import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBuffer
@@ -266,7 +265,7 @@ private class EncryptedBlockData(
 }
 
 private class ReadableChannelFileRegion(source: ReadableByteChannel, 
blockSize: Long)
-  extends AbstractReferenceCounted with FileRegion {
+  extends AbstractFileRegion {
 
   private var _transferred = 0L
 
@@ -277,7 +276,7 @@ private class ReadableChannelFileRegion(source: 
ReadableByteChannel, blockSize:
 
   override def position(): Long = 0
 
-  override def transfered(): Long = _transferred
+  override def transferred(): Long = _transferred
 
   override def transferTo(target: WritableByteChannel, pos: Long): Long = {
     assert(pos == transfered(), "Invalid position.")

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 1831f33..fea6427 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.4.0.jar
-arrow-memory-0.4.0.jar
-arrow-vector-0.4.0.jar
+arrow-format-0.8.0.jar
+arrow-memory-0.8.0.jar
+arrow-vector-0.8.0.jar
 avro-1.7.7.jar
 avro-ipc-1.7.7.jar
 avro-mapred-1.7.7-hadoop2.jar
@@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.6.5.jar
 hk2-api-2.4.0-b34.jar
 hk2-locator-2.4.0-b34.jar
 hk2-utils-2.4.0-b34.jar
-hppc-0.7.1.jar
+hppc-0.7.2.jar
 htrace-core-3.0.4.jar
 httpclient-4.5.2.jar
 httpcore-4.4.4.jar
@@ -144,7 +144,7 @@ metrics-json-3.1.5.jar
 metrics-jvm-3.1.5.jar
 minlog-1.3.0.jar
 netty-3.9.9.Final.jar
-netty-all-4.0.47.Final.jar
+netty-all-4.1.17.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
 orc-core-1.4.1-nohive.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index fe14c05..6dd4433 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.4.0.jar
-arrow-memory-0.4.0.jar
-arrow-vector-0.4.0.jar
+arrow-format-0.8.0.jar
+arrow-memory-0.8.0.jar
+arrow-vector-0.8.0.jar
 avro-1.7.7.jar
 avro-ipc-1.7.7.jar
 avro-mapred-1.7.7-hadoop2.jar
@@ -82,7 +82,7 @@ hadoop-yarn-server-web-proxy-2.7.3.jar
 hk2-api-2.4.0-b34.jar
 hk2-locator-2.4.0-b34.jar
 hk2-utils-2.4.0-b34.jar
-hppc-0.7.1.jar
+hppc-0.7.2.jar
 htrace-core-3.1.0-incubating.jar
 httpclient-4.5.2.jar
 httpcore-4.4.4.jar
@@ -145,7 +145,7 @@ metrics-json-3.1.5.jar
 metrics-jvm-3.1.5.jar
 minlog-1.3.0.jar
 netty-3.9.9.Final.jar
-netty-all-4.0.47.Final.jar
+netty-all-4.1.17.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
 orc-core-1.4.1-nohive.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52db79e..92f8970 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
     <paranamer.version>2.8</paranamer.version>
     <maven-antrun.version>1.8</maven-antrun.version>
     <commons-crypto.version>1.0.0</commons-crypto.version>
-    <arrow.version>0.4.0</arrow.version>
+    <arrow.version>0.8.0</arrow.version>
 
     <test.java.home>${java.home}</test.java.home>
     <test.exclude.tags></test.exclude.tags>
@@ -580,7 +580,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.0.47.Final</version>
+        <version>4.1.17.Final</version>
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
@@ -1974,6 +1974,14 @@
           </exclusion>
           <exclusion>
             <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
             <artifactId>netty-handler</artifactId>
           </exclusion>
         </exclusions>

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 37e7cf3..88d6a19 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -223,27 +223,14 @@ def _create_batch(series, timezone):
         series = [series]
     series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in 
series)
 
-    # If a nullable integer series has been promoted to floating point with 
NaNs, need to cast
-    # NOTE: this is not necessary with Arrow >= 0.7
-    def cast_series(s, t):
-        if type(t) == pa.TimestampType:
-            # NOTE: convert to 'us' with astype here, unit ignored in 
`from_pandas` see ARROW-1680
-            return _check_series_convert_timestamps_internal(s.fillna(0), 
timezone)\
-                .values.astype('datetime64[us]', copy=False)
-        # NOTE: can not compare None with pyarrow.DataType(), fixed with Arrow 
>= 0.7.1
-        elif t is not None and t == pa.date32():
-            # TODO: this converts the series to Python objects, possibly avoid 
with Arrow >= 0.8
-            return s.dt.date
-        elif t is None or s.dtype == t.to_pandas_dtype():
-            return s
-        else:
-            return s.fillna(0).astype(t.to_pandas_dtype(), copy=False)
-
-    # Some object types don't support masks in Arrow, see ARROW-1721
     def create_array(s, t):
-        casted = cast_series(s, t)
-        mask = None if casted.dtype == 'object' else s.isnull()
-        return pa.Array.from_pandas(casted, mask=mask, type=t)
+        mask = s.isnull()
+        # Ensure timestamp series are in expected form for Spark internal 
representation
+        if t is not None and pa.types.is_timestamp(t):
+            s = _check_series_convert_timestamps_internal(s.fillna(0), 
timezone)
+            # TODO: need cast after Arrow conversion, ns values cause error 
with pandas 0.19.2
+            return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
+        return pa.Array.from_pandas(s, mask=mask, type=t)
 
     arrs = [create_array(s, t) for s, t in series]
     return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
xrange(len(arrs))])

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 75395a7..440684d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1906,7 +1906,9 @@ class DataFrame(object):
         if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", 
"false").lower() == "true":
             try:
                 from pyspark.sql.types import 
_check_dataframe_localize_timestamps
+                from pyspark.sql.utils import _require_minimum_pyarrow_version
                 import pyarrow
+                _require_minimum_pyarrow_version()
                 tables = self._collectAsArrow()
                 if tables:
                     table = pyarrow.concat_tables(tables)

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5453005..ddd8df3 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2159,16 +2159,17 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 
        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
        >>> from pyspark.sql.types import IntegerType, StringType
-       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
-       >>> @pandas_udf(StringType())
+       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())  # doctest: 
+SKIP
+       >>> @pandas_udf(StringType())  # doctest: +SKIP
        ... def to_upper(s):
        ...     return s.str.upper()
        ...
-       >>> @pandas_udf("integer", PandasUDFType.SCALAR)
+       >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
        ... def add_one(x):
        ...     return x + 1
        ...
-       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
+       >>> df = spark.createDataFrame([(1, "John Doe", 21)],
+       ...                            ("id", "name", "age"))  # doctest: +SKIP
        >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
        ...     .show()  # doctest: +SKIP
        +----------+--------------+------------+
@@ -2189,8 +2190,8 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
        >>> df = spark.createDataFrame(
        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+       ...     ("id", "v"))  # doctest: +SKIP
+       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # 
doctest: +SKIP
        ... def normalize(pdf):
        ...     v = pdf.v
        ...     return pdf.assign(v=(v - v.mean()) / v.std())

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/group.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 4d47dd6..09fae46 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -218,7 +218,7 @@ class GroupedData(object):
         >>> df = spark.createDataFrame(
         ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ...     ("id", "v"))
-        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # 
doctest: +SKIP
         ... def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index e2435e0..86db16e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -495,11 +495,14 @@ class SparkSession(object):
         from pyspark.serializers import ArrowSerializer, _create_batch
         from pyspark.sql.types import from_arrow_schema, to_arrow_type, \
             _old_pandas_exception_message, TimestampType
+        from pyspark.sql.utils import _require_minimum_pyarrow_version
         try:
             from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
         except ImportError as e:
             raise ImportError(_old_pandas_exception_message(e))
 
+        _require_minimum_pyarrow_version()
+
         # Determine arrow types to coerce data when creating batches
         if isinstance(schema, StructType):
             arrow_types = [to_arrow_type(f.dataType) for f in schema.fields]

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b4d32d8..6fdfda1 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3339,10 +3339,11 @@ class ArrowTests(ReusedSQLTestCase):
                 self.spark.createDataFrame(pd.DataFrame({"a": [1]}), 
schema="int")
 
     def test_createDataFrame_does_not_modify_input(self):
+        import pandas as pd
         # Some series get converted for Spark to consume, this makes sure 
input is unchanged
         pdf = self.create_pandas_data_frame()
         # Use a nanosecond value to make sure it is not truncated
-        pdf.ix[0, '7_timestamp_t'] = 1
+        pdf.ix[0, '7_timestamp_t'] = pd.Timestamp(1)
         # Integers with nulls will get NaNs filled with 0 and will be casted
         pdf.ix[1, '2_int_t'] = None
         pdf_copy = pdf.copy(deep=True)
@@ -3356,6 +3357,7 @@ class ArrowTests(ReusedSQLTestCase):
         self.assertEquals(self.schema, schema_rt)
 
 
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
 class PandasUDFTests(ReusedSQLTestCase):
     def test_pandas_udf_basic(self):
         from pyspark.rdd import PythonEvalType
@@ -3671,9 +3673,9 @@ class VectorizedUDFTests(ReusedSQLTestCase):
     def test_vectorized_udf_wrong_return_type(self):
         from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
-        f = pandas_udf(lambda x: x * 1.0, StringType())
+        f = pandas_udf(lambda x: x * 1.0, ArrayType(LongType()))
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Invalid.*type'):
+            with self.assertRaisesRegexp(Exception, 
'Unsupported.*type.*conversion'):
                 df.select(f(col('id'))).collect()
 
     def test_vectorized_udf_return_scalar(self):
@@ -3974,12 +3976,12 @@ class GroupbyApplyTests(ReusedSQLTestCase):
 
         foo = pandas_udf(
             lambda pdf: pdf,
-            'id long, v string',
+            'id long, v array<int>',
             PandasUDFType.GROUP_MAP
         )
 
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Invalid.*type'):
+            with self.assertRaisesRegexp(Exception, 
'Unsupported.*type.*conversion'):
                 df.groupby('id').apply(foo).sort('id').toPandas()
 
     def test_wrong_args(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 78abc32..46d9a41 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1642,29 +1642,28 @@ def to_arrow_schema(schema):
 def from_arrow_type(at):
     """ Convert pyarrow type to Spark data type.
     """
-    # TODO: newer pyarrow has is_boolean(at) functions that would be better to 
check type
-    import pyarrow as pa
-    if at == pa.bool_():
+    import pyarrow.types as types
+    if types.is_boolean(at):
         spark_type = BooleanType()
-    elif at == pa.int8():
+    elif types.is_int8(at):
         spark_type = ByteType()
-    elif at == pa.int16():
+    elif types.is_int16(at):
         spark_type = ShortType()
-    elif at == pa.int32():
+    elif types.is_int32(at):
         spark_type = IntegerType()
-    elif at == pa.int64():
+    elif types.is_int64(at):
         spark_type = LongType()
-    elif at == pa.float32():
+    elif types.is_float32(at):
         spark_type = FloatType()
-    elif at == pa.float64():
+    elif types.is_float64(at):
         spark_type = DoubleType()
-    elif type(at) == pa.DecimalType:
+    elif types.is_decimal(at):
         spark_type = DecimalType(precision=at.precision, scale=at.scale)
-    elif at == pa.string():
+    elif types.is_string(at):
         spark_type = StringType()
-    elif at == pa.date32():
+    elif types.is_date32(at):
         spark_type = DateType()
-    elif type(at) == pa.TimestampType:
+    elif types.is_timestamp(at):
         spark_type = TimestampType()
     else:
         raise TypeError("Unsupported type in conversion from Arrow: " + 
str(at))

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index c3301a4..50c87ba 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -33,19 +33,23 @@ def _wrap_function(sc, func, returnType):
 
 
 def _create_udf(f, returnType, evalType):
-    if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF:
+
+    if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF or \
+            evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
         import inspect
+        from pyspark.sql.utils import _require_minimum_pyarrow_version
+
+        _require_minimum_pyarrow_version()
         argspec = inspect.getargspec(f)
-        if len(argspec.args) == 0 and argspec.varargs is None:
+
+        if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF and 
len(argspec.args) == 0 and \
+                argspec.varargs is None:
             raise ValueError(
                 "Invalid function: 0-arg pandas_udfs are not supported. "
                 "Instead, create a 1-arg pandas_udf and ignore the arg in your 
function."
             )
 
-    elif evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
-        import inspect
-        argspec = inspect.getargspec(f)
-        if len(argspec.args) != 1:
+        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and 
len(argspec.args) != 1:
             raise ValueError(
                 "Invalid function: pandas_udfs with function type GROUP_MAP "
                 "must take a single arg that is a pandas DataFrame."

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/python/pyspark/sql/utils.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 7bc6a59..cc7dabb 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -110,3 +110,12 @@ def toJArray(gateway, jtype, arr):
     for i in range(0, len(arr)):
         jarr[i] = arr[i]
     return jarr
+
+
+def _require_minimum_pyarrow_version():
+    """ Raise ImportError if minimum version of pyarrow is not installed
+    """
+    from distutils.version import LooseVersion
+    import pyarrow
+    if LooseVersion(pyarrow.__version__) < LooseVersion('0.8.0'):
+        raise ImportError("pyarrow >= 0.8.0 must be installed on calling 
Python process")

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index d53e1fc..528f66f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -275,30 +275,30 @@ public final class ArrowColumnVector extends ColumnVector 
{
   public ArrowColumnVector(ValueVector vector) {
     super(ArrowUtils.fromArrowField(vector.getField()));
 
-    if (vector instanceof NullableBitVector) {
-      accessor = new BooleanAccessor((NullableBitVector) vector);
-    } else if (vector instanceof NullableTinyIntVector) {
-      accessor = new ByteAccessor((NullableTinyIntVector) vector);
-    } else if (vector instanceof NullableSmallIntVector) {
-      accessor = new ShortAccessor((NullableSmallIntVector) vector);
-    } else if (vector instanceof NullableIntVector) {
-      accessor = new IntAccessor((NullableIntVector) vector);
-    } else if (vector instanceof NullableBigIntVector) {
-      accessor = new LongAccessor((NullableBigIntVector) vector);
-    } else if (vector instanceof NullableFloat4Vector) {
-      accessor = new FloatAccessor((NullableFloat4Vector) vector);
-    } else if (vector instanceof NullableFloat8Vector) {
-      accessor = new DoubleAccessor((NullableFloat8Vector) vector);
-    } else if (vector instanceof NullableDecimalVector) {
-      accessor = new DecimalAccessor((NullableDecimalVector) vector);
-    } else if (vector instanceof NullableVarCharVector) {
-      accessor = new StringAccessor((NullableVarCharVector) vector);
-    } else if (vector instanceof NullableVarBinaryVector) {
-      accessor = new BinaryAccessor((NullableVarBinaryVector) vector);
-    } else if (vector instanceof NullableDateDayVector) {
-      accessor = new DateAccessor((NullableDateDayVector) vector);
-    } else if (vector instanceof NullableTimeStampMicroTZVector) {
-      accessor = new TimestampAccessor((NullableTimeStampMicroTZVector) 
vector);
+    if (vector instanceof BitVector) {
+      accessor = new BooleanAccessor((BitVector) vector);
+    } else if (vector instanceof TinyIntVector) {
+      accessor = new ByteAccessor((TinyIntVector) vector);
+    } else if (vector instanceof SmallIntVector) {
+      accessor = new ShortAccessor((SmallIntVector) vector);
+    } else if (vector instanceof IntVector) {
+      accessor = new IntAccessor((IntVector) vector);
+    } else if (vector instanceof BigIntVector) {
+      accessor = new LongAccessor((BigIntVector) vector);
+    } else if (vector instanceof Float4Vector) {
+      accessor = new FloatAccessor((Float4Vector) vector);
+    } else if (vector instanceof Float8Vector) {
+      accessor = new DoubleAccessor((Float8Vector) vector);
+    } else if (vector instanceof DecimalVector) {
+      accessor = new DecimalAccessor((DecimalVector) vector);
+    } else if (vector instanceof VarCharVector) {
+      accessor = new StringAccessor((VarCharVector) vector);
+    } else if (vector instanceof VarBinaryVector) {
+      accessor = new BinaryAccessor((VarBinaryVector) vector);
+    } else if (vector instanceof DateDayVector) {
+      accessor = new DateAccessor((DateDayVector) vector);
+    } else if (vector instanceof TimeStampMicroTZVector) {
+      accessor = new TimestampAccessor((TimeStampMicroTZVector) vector);
     } else if (vector instanceof ListVector) {
       ListVector listVector = (ListVector) vector;
       accessor = new ArrayAccessor(listVector);
@@ -321,23 +321,21 @@ public final class ArrowColumnVector extends ColumnVector 
{
   private abstract static class ArrowVectorAccessor {
 
     private final ValueVector vector;
-    private final ValueVector.Accessor nulls;
 
     ArrowVectorAccessor(ValueVector vector) {
       this.vector = vector;
-      this.nulls = vector.getAccessor();
     }
 
     final boolean isNullAt(int rowId) {
-      return nulls.isNull(rowId);
+      return vector.isNull(rowId);
     }
 
     final int getValueCount() {
-      return nulls.getValueCount();
+      return vector.getValueCount();
     }
 
     final int getNullCount() {
-      return nulls.getNullCount();
+      return vector.getNullCount();
     }
 
     final void close() {
@@ -395,11 +393,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class BooleanAccessor extends ArrowVectorAccessor {
 
-    private final NullableBitVector.Accessor accessor;
+    private final BitVector accessor;
 
-    BooleanAccessor(NullableBitVector vector) {
+    BooleanAccessor(BitVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -410,11 +408,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class ByteAccessor extends ArrowVectorAccessor {
 
-    private final NullableTinyIntVector.Accessor accessor;
+    private final TinyIntVector accessor;
 
-    ByteAccessor(NullableTinyIntVector vector) {
+    ByteAccessor(TinyIntVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -425,11 +423,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class ShortAccessor extends ArrowVectorAccessor {
 
-    private final NullableSmallIntVector.Accessor accessor;
+    private final SmallIntVector accessor;
 
-    ShortAccessor(NullableSmallIntVector vector) {
+    ShortAccessor(SmallIntVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -440,11 +438,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class IntAccessor extends ArrowVectorAccessor {
 
-    private final NullableIntVector.Accessor accessor;
+    private final IntVector accessor;
 
-    IntAccessor(NullableIntVector vector) {
+    IntAccessor(IntVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -455,11 +453,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class LongAccessor extends ArrowVectorAccessor {
 
-    private final NullableBigIntVector.Accessor accessor;
+    private final BigIntVector accessor;
 
-    LongAccessor(NullableBigIntVector vector) {
+    LongAccessor(BigIntVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -470,11 +468,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class FloatAccessor extends ArrowVectorAccessor {
 
-    private final NullableFloat4Vector.Accessor accessor;
+    private final Float4Vector accessor;
 
-    FloatAccessor(NullableFloat4Vector vector) {
+    FloatAccessor(Float4Vector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -485,11 +483,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class DoubleAccessor extends ArrowVectorAccessor {
 
-    private final NullableFloat8Vector.Accessor accessor;
+    private final Float8Vector accessor;
 
-    DoubleAccessor(NullableFloat8Vector vector) {
+    DoubleAccessor(Float8Vector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -500,11 +498,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class DecimalAccessor extends ArrowVectorAccessor {
 
-    private final NullableDecimalVector.Accessor accessor;
+    private final DecimalVector accessor;
 
-    DecimalAccessor(NullableDecimalVector vector) {
+    DecimalAccessor(DecimalVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -516,12 +514,12 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class StringAccessor extends ArrowVectorAccessor {
 
-    private final NullableVarCharVector.Accessor accessor;
+    private final VarCharVector accessor;
     private final NullableVarCharHolder stringResult = new 
NullableVarCharHolder();
 
-    StringAccessor(NullableVarCharVector vector) {
+    StringAccessor(VarCharVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -539,11 +537,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class BinaryAccessor extends ArrowVectorAccessor {
 
-    private final NullableVarBinaryVector.Accessor accessor;
+    private final VarBinaryVector accessor;
 
-    BinaryAccessor(NullableVarBinaryVector vector) {
+    BinaryAccessor(VarBinaryVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -554,11 +552,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class DateAccessor extends ArrowVectorAccessor {
 
-    private final NullableDateDayVector.Accessor accessor;
+    private final DateDayVector accessor;
 
-    DateAccessor(NullableDateDayVector vector) {
+    DateAccessor(DateDayVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -569,11 +567,11 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class TimestampAccessor extends ArrowVectorAccessor {
 
-    private final NullableTimeStampMicroTZVector.Accessor accessor;
+    private final TimeStampMicroTZVector accessor;
 
-    TimestampAccessor(NullableTimeStampMicroTZVector vector) {
+    TimestampAccessor(TimeStampMicroTZVector vector) {
       super(vector);
-      this.accessor = vector.getAccessor();
+      this.accessor = vector;
     }
 
     @Override
@@ -584,21 +582,21 @@ public final class ArrowColumnVector extends ColumnVector 
{
 
   private static class ArrayAccessor extends ArrowVectorAccessor {
 
-    private final UInt4Vector.Accessor accessor;
+    private final ListVector accessor;
 
     ArrayAccessor(ListVector vector) {
       super(vector);
-      this.accessor = vector.getOffsetVector().getAccessor();
+      this.accessor = vector;
     }
 
     @Override
     final int getArrayLength(int rowId) {
-      return accessor.get(rowId + 1) - accessor.get(rowId);
+      return accessor.getInnerValueCountAt(rowId);
     }
 
     @Override
     final int getArrayOffset(int rowId) {
-      return accessor.get(rowId);
+      return accessor.getOffsetBuffer().getInt(rowId * accessor.OFFSET_WIDTH);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 3cafb34..bcfc412 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -24,8 +24,8 @@ import scala.collection.JavaConverters._
 
 import org.apache.arrow.memory.BufferAllocator
 import org.apache.arrow.vector._
-import org.apache.arrow.vector.file._
-import org.apache.arrow.vector.schema.ArrowRecordBatch
+import org.apache.arrow.vector.ipc.{ArrowFileReader, ArrowFileWriter}
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
 import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
 
 import org.apache.spark.TaskContext
@@ -86,13 +86,9 @@ private[sql] object ArrowConverters {
     val root = VectorSchemaRoot.create(arrowSchema, allocator)
     val arrowWriter = ArrowWriter.create(root)
 
-    var closed = false
-
     context.addTaskCompletionListener { _ =>
-      if (!closed) {
-        root.close()
-        allocator.close()
-      }
+      root.close()
+      allocator.close()
     }
 
     new Iterator[ArrowPayload] {
@@ -100,7 +96,6 @@ private[sql] object ArrowConverters {
       override def hasNext: Boolean = rowIter.hasNext || {
         root.close()
         allocator.close()
-        closed = true
         false
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index e4af4f6..0258056 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -46,17 +46,17 @@ object ArrowWriter {
   private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
     val field = vector.getField()
     (ArrowUtils.fromArrowField(field), vector) match {
-      case (BooleanType, vector: NullableBitVector) => new 
BooleanWriter(vector)
-      case (ByteType, vector: NullableTinyIntVector) => new ByteWriter(vector)
-      case (ShortType, vector: NullableSmallIntVector) => new 
ShortWriter(vector)
-      case (IntegerType, vector: NullableIntVector) => new 
IntegerWriter(vector)
-      case (LongType, vector: NullableBigIntVector) => new LongWriter(vector)
-      case (FloatType, vector: NullableFloat4Vector) => new FloatWriter(vector)
-      case (DoubleType, vector: NullableFloat8Vector) => new 
DoubleWriter(vector)
-      case (StringType, vector: NullableVarCharVector) => new 
StringWriter(vector)
-      case (BinaryType, vector: NullableVarBinaryVector) => new 
BinaryWriter(vector)
-      case (DateType, vector: NullableDateDayVector) => new DateWriter(vector)
-      case (TimestampType, vector: NullableTimeStampMicroTZVector) => new 
TimestampWriter(vector)
+      case (BooleanType, vector: BitVector) => new BooleanWriter(vector)
+      case (ByteType, vector: TinyIntVector) => new ByteWriter(vector)
+      case (ShortType, vector: SmallIntVector) => new ShortWriter(vector)
+      case (IntegerType, vector: IntVector) => new IntegerWriter(vector)
+      case (LongType, vector: BigIntVector) => new LongWriter(vector)
+      case (FloatType, vector: Float4Vector) => new FloatWriter(vector)
+      case (DoubleType, vector: Float8Vector) => new DoubleWriter(vector)
+      case (StringType, vector: VarCharVector) => new StringWriter(vector)
+      case (BinaryType, vector: VarBinaryVector) => new BinaryWriter(vector)
+      case (DateType, vector: DateDayVector) => new DateWriter(vector)
+      case (TimestampType, vector: TimeStampMicroTZVector) => new 
TimestampWriter(vector)
       case (ArrayType(_, _), vector: ListVector) =>
         val elementVector = createFieldWriter(vector.getDataVector())
         new ArrayWriter(vector, elementVector)
@@ -103,7 +103,6 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: 
Array[ArrowFieldWriter]) {
 private[arrow] abstract class ArrowFieldWriter {
 
   def valueVector: ValueVector
-  def valueMutator: ValueVector.Mutator
 
   def name: String = valueVector.getField().getName()
   def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
@@ -124,161 +123,144 @@ private[arrow] abstract class ArrowFieldWriter {
   }
 
   def finish(): Unit = {
-    valueMutator.setValueCount(count)
+    valueVector.setValueCount(count)
   }
 
   def reset(): Unit = {
-    valueMutator.reset()
+    // TODO: reset() should be in a common interface
+    valueVector match {
+      case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset()
+      case variableWidthVector: BaseVariableWidthVector => 
variableWidthVector.reset()
+      case _ =>
+    }
     count = 0
   }
 }
 
-private[arrow] class BooleanWriter(val valueVector: NullableBitVector) extends 
ArrowFieldWriter {
-
-  override def valueMutator: NullableBitVector#Mutator = 
valueVector.getMutator()
+private[arrow] class BooleanWriter(val valueVector: BitVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, if (input.getBoolean(ordinal)) 1 else 0)
+    valueVector.setSafe(count, if (input.getBoolean(ordinal)) 1 else 0)
   }
 }
 
-private[arrow] class ByteWriter(val valueVector: NullableTinyIntVector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableTinyIntVector#Mutator = 
valueVector.getMutator()
+private[arrow] class ByteWriter(val valueVector: TinyIntVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getByte(ordinal))
+    valueVector.setSafe(count, input.getByte(ordinal))
   }
 }
 
-private[arrow] class ShortWriter(val valueVector: NullableSmallIntVector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableSmallIntVector#Mutator = 
valueVector.getMutator()
+private[arrow] class ShortWriter(val valueVector: SmallIntVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getShort(ordinal))
+    valueVector.setSafe(count, input.getShort(ordinal))
   }
 }
 
-private[arrow] class IntegerWriter(val valueVector: NullableIntVector) extends 
ArrowFieldWriter {
-
-  override def valueMutator: NullableIntVector#Mutator = 
valueVector.getMutator()
+private[arrow] class IntegerWriter(val valueVector: IntVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getInt(ordinal))
+    valueVector.setSafe(count, input.getInt(ordinal))
   }
 }
 
-private[arrow] class LongWriter(val valueVector: NullableBigIntVector) extends 
ArrowFieldWriter {
-
-  override def valueMutator: NullableBigIntVector#Mutator = 
valueVector.getMutator()
+private[arrow] class LongWriter(val valueVector: BigIntVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getLong(ordinal))
+    valueVector.setSafe(count, input.getLong(ordinal))
   }
 }
 
-private[arrow] class FloatWriter(val valueVector: NullableFloat4Vector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableFloat4Vector#Mutator = 
valueVector.getMutator()
+private[arrow] class FloatWriter(val valueVector: Float4Vector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getFloat(ordinal))
+    valueVector.setSafe(count, input.getFloat(ordinal))
   }
 }
 
-private[arrow] class DoubleWriter(val valueVector: NullableFloat8Vector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableFloat8Vector#Mutator = 
valueVector.getMutator()
+private[arrow] class DoubleWriter(val valueVector: Float8Vector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getDouble(ordinal))
+    valueVector.setSafe(count, input.getDouble(ordinal))
   }
 }
 
-private[arrow] class StringWriter(val valueVector: NullableVarCharVector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableVarCharVector#Mutator = 
valueVector.getMutator()
+private[arrow] class StringWriter(val valueVector: VarCharVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
     val utf8 = input.getUTF8String(ordinal)
     val utf8ByteBuffer = utf8.getByteBuffer
     // todo: for off-heap UTF8String, how to pass in to arrow without copy?
-    valueMutator.setSafe(count, utf8ByteBuffer, utf8ByteBuffer.position(), 
utf8.numBytes())
+    valueVector.setSafe(count, utf8ByteBuffer, utf8ByteBuffer.position(), 
utf8.numBytes())
   }
 }
 
 private[arrow] class BinaryWriter(
-    val valueVector: NullableVarBinaryVector) extends ArrowFieldWriter {
-
-  override def valueMutator: NullableVarBinaryVector#Mutator = 
valueVector.getMutator()
+    val valueVector: VarBinaryVector) extends ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
     val bytes = input.getBinary(ordinal)
-    valueMutator.setSafe(count, bytes, 0, bytes.length)
+    valueVector.setSafe(count, bytes, 0, bytes.length)
   }
 }
 
-private[arrow] class DateWriter(val valueVector: NullableDateDayVector) 
extends ArrowFieldWriter {
-
-  override def valueMutator: NullableDateDayVector#Mutator = 
valueVector.getMutator()
+private[arrow] class DateWriter(val valueVector: DateDayVector) extends 
ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getInt(ordinal))
+    valueVector.setSafe(count, input.getInt(ordinal))
   }
 }
 
 private[arrow] class TimestampWriter(
-    val valueVector: NullableTimeStampMicroTZVector) extends ArrowFieldWriter {
-
-  override def valueMutator: NullableTimeStampMicroTZVector#Mutator = 
valueVector.getMutator()
+    val valueVector: TimeStampMicroTZVector) extends ArrowFieldWriter {
 
   override def setNull(): Unit = {
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
-    valueMutator.setSafe(count, input.getLong(ordinal))
+    valueVector.setSafe(count, input.getLong(ordinal))
   }
 }
 
@@ -286,20 +268,18 @@ private[arrow] class ArrayWriter(
     val valueVector: ListVector,
     val elementWriter: ArrowFieldWriter) extends ArrowFieldWriter {
 
-  override def valueMutator: ListVector#Mutator = valueVector.getMutator()
-
   override def setNull(): Unit = {
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
     val array = input.getArray(ordinal)
     var i = 0
-    valueMutator.startNewValue(count)
+    valueVector.startNewValue(count)
     while (i < array.numElements()) {
       elementWriter.write(array, i)
       i += 1
     }
-    valueMutator.endValue(count, array.numElements())
+    valueVector.endValue(count, array.numElements())
   }
 
   override def finish(): Unit = {
@@ -317,8 +297,6 @@ private[arrow] class StructWriter(
     val valueVector: NullableMapVector,
     children: Array[ArrowFieldWriter]) extends ArrowFieldWriter {
 
-  override def valueMutator: NullableMapVector#Mutator = 
valueVector.getMutator()
-
   override def setNull(): Unit = {
     var i = 0
     while (i < children.length) {
@@ -326,7 +304,7 @@ private[arrow] class StructWriter(
       children(i).count += 1
       i += 1
     }
-    valueMutator.setNull(count)
+    valueVector.setNull(count)
   }
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
@@ -336,7 +314,7 @@ private[arrow] class StructWriter(
       children(i).write(struct, i)
       i += 1
     }
-    valueMutator.setIndexDefined(count)
+    valueVector.setIndexDefined(count)
   }
 
   override def finish(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/59d52631/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 9a94d77..5cc8ed3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.JavaConverters._
 
 import org.apache.arrow.vector.VectorSchemaRoot
-import org.apache.arrow.vector.stream.{ArrowStreamReader, ArrowStreamWriter}
+import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
 
 import org.apache.spark._
 import org.apache.spark.api.python._
@@ -74,13 +74,9 @@ class ArrowPythonRunner(
         val root = VectorSchemaRoot.create(arrowSchema, allocator)
         val arrowWriter = ArrowWriter.create(root)
 
-        var closed = false
-
         context.addTaskCompletionListener { _ =>
-          if (!closed) {
-            root.close()
-            allocator.close()
-          }
+          root.close()
+          allocator.close()
         }
 
         val writer = new ArrowStreamWriter(root, null, dataOut)
@@ -102,7 +98,6 @@ class ArrowPythonRunner(
           writer.end()
           root.close()
           allocator.close()
-          closed = true
         }
       }
     }
@@ -126,18 +121,11 @@ class ArrowPythonRunner(
       private var schema: StructType = _
       private var vectors: Array[ColumnVector] = _
 
-      private var closed = false
-
       context.addTaskCompletionListener { _ =>
-        // todo: we need something like `reader.end()`, which release all the 
resources, but leave
-        // the input stream open. `reader.close()` will close the socket and 
we can't reuse worker.
-        // So here we simply not close the reader, which is problematic.
-        if (!closed) {
-          if (root != null) {
-            root.close()
-          }
-          allocator.close()
+        if (reader != null) {
+          reader.close(false)
         }
+        allocator.close()
       }
 
       private var batchLoaded = true
@@ -154,9 +142,8 @@ class ArrowPythonRunner(
               batch.setNumRows(root.getRowCount)
               batch
             } else {
-              root.close()
+              reader.close(false)
               allocator.close()
-              closed = true
               // Reach end of stream. Call `read()` again to read control data.
               read()
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to