[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18655


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-26 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129735129
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
-
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+var closed = false
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
--- End diff --

I filed https://issues.apache.org/jira/browse/ARROW-1283 to fix this.  For 
now, it looks like we need this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129488362
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "nanData-floating_point.json")
   }
 
+  test("array type conversion") {
+val json =
+  s"""
+ |{
+ |  "schema" : {
+ |"fields" : [ {
+ |  "name" : "a_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : false,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "b_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : false,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "c_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : true,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "d_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : true,
+ |"type" : {
+ |  "name" : "list"
+ |},
+ |"children" : [ {
+ |  "name" : "element",
+ |  "nullable" : false,
+ |  "type" : {
+ |"name" : "int",
+ |

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129487716
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala 
---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
--- End diff --

Thanks, I'll modify it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129486273
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.vectorized.ArrowColumnVector
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ArrowWriterSuite extends SparkFunSuite {
+
+  test("simple") {
+def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) 
=> Any): Unit = {
+  val schema = new StructType().add("value", dt, nullable = true)
+  val writer = ArrowWriter.create(schema)
+  assert(writer.schema === schema)
+
+  data.foreach { datum =>
+writer.write(InternalRow(datum))
+  }
+  writer.finish()
+
+  val reader = new 
ArrowColumnVector(writer.root.getFieldVectors().get(0))
+  data.zipWithIndex.foreach {
+case (null, rowId) => assert(reader.isNullAt(rowId))
+case (datum, rowId) => assert(get(reader, rowId) === datum)
--- End diff --

Thanks, I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129486171
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala 
---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
+  case BooleanType =>
+new BooleanWriter(vector.asInstanceOf[NullableBitVector])
+  case ByteType =>
+new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
+  case ShortType =>
+new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
+  case IntegerType =>
+new IntegerWriter(vector.asInstanceOf[NullableIntVector])
+  case LongType =>
+new LongWriter(vector.asInstanceOf[NullableBigIntVector])
+  case FloatType =>
+new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
+  case DoubleType =>
+new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
+  case StringType =>
+new StringWriter(vector.asInstanceOf[NullableVarCharVector])
+  case BinaryType =>
+new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
+  case ArrayType(_, _) =>
+val v = vector.asInstanceOf[ListVector]
+val elementVector = createFieldWriter(v.getDataVector())
+new ArrayWriter(v, elementVector)
+  case StructType(_) =>
+val v = vector.asInstanceOf[NullableMapVector]
+val children = (0 until v.size()).map { ordinal =>
+  createFieldWriter(v.getChildByOrdinal(ordinal))
+}
+new StructWriter(v, children.toArray)
+  case dt =>
+throw new UnsupportedOperationException(s"Unsupported data type: 
${dt.simpleString}")
+}
+  }
+}
+
+class ArrowWriter(
+val root: VectorSchemaRoot,
+fields: Array[ArrowFieldWriter]) {
+
+  def schema: StructType = StructType(fields.map { f =>
+StructField(f.name, f.dataType, f.nullable)
+  })
+
+  private var count: Int = 0
+
+  def write(row: InternalRow): Unit = {
+var i = 0
+while (i < fields.size) {
+  fields(i).write(row, i)
+  i += 1
+}
+count += 1
+  }
+
+  def finish(): Unit = {
+root.setRowCount(count)
+fields.foreach(_.finish())
+  }
+
+  def reset(): Unit = {
+root.setRowCount(0)
+count = 0
+fields.foreach(_.reset())
+  }
+}
+
+private[arrow] abstract class ArrowFieldWriter {
+
+  def valueVector: ValueVector
+  def valueMutator: ValueVector.Mutator
+
+  def name: String = valueVector.getField().getName()
+  def dataType: DataType = 
ArrowUtils.fromArrowField(valueVector.getField())
+  def nullable: Boolean = valueVector.getField().isNullable()
+
+  def setNull(): Unit
+  def setValue(input: 

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129484715
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
-
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+var closed = false
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
--- End diff --

The root just releases the buffers from the FieldVectors, so I would think 
it should be able to handle being closed twice.  I'll check tomorrow if seems 
reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129482346
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "nanData-floating_point.json")
   }
 
+  test("array type conversion") {
+val json =
+  s"""
+ |{
+ |  "schema" : {
+ |"fields" : [ {
+ |  "name" : "a_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : false,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "b_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : false,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "c_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : true,
+ |"type" : {
+ |  "name" : "int",
+ |  "bitWidth" : 32,
+ |  "isSigned" : true
+ |},
+ |"children" : [ ],
+ |"typeLayout" : {
+ |  "vectors" : [ {
+ |"type" : "VALIDITY",
+ |"typeBitWidth" : 1
+ |  }, {
+ |"type" : "DATA",
+ |"typeBitWidth" : 32
+ |  } ]
+ |}
+ |  } ],
+ |  "typeLayout" : {
+ |"vectors" : [ {
+ |  "type" : "VALIDITY",
+ |  "typeBitWidth" : 1
+ |}, {
+ |  "type" : "OFFSET",
+ |  "typeBitWidth" : 32
+ |} ]
+ |  }
+ |}, {
+ |  "name" : "d_arr",
+ |  "nullable" : true,
+ |  "type" : {
+ |"name" : "list"
+ |  },
+ |  "children" : [ {
+ |"name" : "element",
+ |"nullable" : true,
+ |"type" : {
+ |  "name" : "list"
+ |},
+ |"children" : [ {
+ |  "name" : "element",
+ |  "nullable" : false,
+ |  "type" : {
+ |"name" : "int",
+ |   

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129481638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala 
---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
--- End diff --

Would it be better to do as below?

```scala
(ArrowUtils.fromArrowField(field), vector) match {
  case (_: BooleanType, vector: NullableBitVector) => new 
BooleanWriter(vector)
  case (_: ByteType, vector: NullableTinyIntVector) => new 
ByteWriter(vector)
  ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129480466
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.vectorized.ArrowColumnVector
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ArrowWriterSuite extends SparkFunSuite {
+
+  test("simple") {
+def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) 
=> Any): Unit = {
+  val schema = new StructType().add("value", dt, nullable = true)
+  val writer = ArrowWriter.create(schema)
+  assert(writer.schema === schema)
+
+  data.foreach { datum =>
+writer.write(InternalRow(datum))
+  }
+  writer.finish()
+
+  val reader = new 
ArrowColumnVector(writer.root.getFieldVectors().get(0))
+  data.zipWithIndex.foreach {
+case (null, rowId) => assert(reader.isNullAt(rowId))
+case (datum, rowId) => assert(get(reader, rowId) === datum)
--- End diff --

we can do something like
```
dt match {
 case BooleanType => reader.getBoolean(rowid)
 case IntegerType => ...
 ...
}
```
Then the caller side doesn't need to pass in a `get`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129480291
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala 
---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.arrow
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
+  case BooleanType =>
+new BooleanWriter(vector.asInstanceOf[NullableBitVector])
+  case ByteType =>
+new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
+  case ShortType =>
+new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
+  case IntegerType =>
+new IntegerWriter(vector.asInstanceOf[NullableIntVector])
+  case LongType =>
+new LongWriter(vector.asInstanceOf[NullableBigIntVector])
+  case FloatType =>
+new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
+  case DoubleType =>
+new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
+  case StringType =>
+new StringWriter(vector.asInstanceOf[NullableVarCharVector])
+  case BinaryType =>
+new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
+  case ArrayType(_, _) =>
+val v = vector.asInstanceOf[ListVector]
+val elementVector = createFieldWriter(v.getDataVector())
+new ArrayWriter(v, elementVector)
+  case StructType(_) =>
+val v = vector.asInstanceOf[NullableMapVector]
+val children = (0 until v.size()).map { ordinal =>
+  createFieldWriter(v.getChildByOrdinal(ordinal))
+}
+new StructWriter(v, children.toArray)
+  case dt =>
+throw new UnsupportedOperationException(s"Unsupported data type: 
${dt.simpleString}")
+}
+  }
+}
+
+class ArrowWriter(
+val root: VectorSchemaRoot,
+fields: Array[ArrowFieldWriter]) {
+
+  def schema: StructType = StructType(fields.map { f =>
+StructField(f.name, f.dataType, f.nullable)
+  })
+
+  private var count: Int = 0
+
+  def write(row: InternalRow): Unit = {
+var i = 0
+while (i < fields.size) {
+  fields(i).write(row, i)
+  i += 1
+}
+count += 1
+  }
+
+  def finish(): Unit = {
+root.setRowCount(count)
+fields.foreach(_.finish())
+  }
+
+  def reset(): Unit = {
+root.setRowCount(0)
+count = 0
+fields.foreach(_.reset())
+  }
+}
+
+private[arrow] abstract class ArrowFieldWriter {
+
+  def valueVector: ValueVector
+  def valueMutator: ValueVector.Mutator
+
+  def name: String = valueVector.getField().getName()
+  def dataType: DataType = 
ArrowUtils.fromArrowField(valueVector.getField())
+  def nullable: Boolean = valueVector.getField().isNullable()
+
+  def setNull(): Unit
+  def setValue(input: 

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129480076
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
-
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+var closed = false
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
--- End diff --

is this a bug in arrow?  cc @BryanCutler 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129477483
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
-
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+var closed = false
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
--- End diff --

The `allocator` can be closed twice, but the `root` throws an exception 
after `allocator` is closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r129477058
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
-
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+var closed = false
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
+context.addTaskCompletionListener { _ =>
+  if (!closed) {
--- End diff --

do we really need this? I think it's ok to close twice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-20 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128598428
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "floating_point-double_precision.json")
   }
 
+  ignore("decimal conversion") {
--- End diff --

Arrow integration support for DecimalType isn't slated until v0.6, so it 
might work but there are no guarantees that a record batch in Java will equal 
when that batch is read by Python/C++.  Also, we can't test here until the 
`JsonFileReader` supports it also.  I made the Arrow JIRA here 
https://issues.apache.org/jira/browse/ARROW-1238


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128313325
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "floating_point-double_precision.json")
   }
 
+  ignore("decimal conversion") {
--- End diff --

That might be true, I haven't looked into it yet.  I can work on adding 
support on the Arrow side, so I'll try to check on that and see where it stands 
in the upcoming 0.5 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128312860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128146875
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "floating_point-double_precision.json")
   }
 
+  ignore("decimal conversion") {
--- End diff --

Oh, I'm sorry, I should have mentioned it.
It seems like `JsonFileReader` doesn't support DecimalType, so I ignored it 
for now.
But now I'm thinking that If Arrow 0.4.0 has a bug for the decimal type as 
you said, should I remove decimal type support from this pr and add support in 
the following prs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128146851
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()
 

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128146856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala
 ---
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
+  case BooleanType =>
+new BooleanWriter(vector.asInstanceOf[NullableBitVector])
+  case ByteType =>
+new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
+  case ShortType =>
+new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
+  case IntegerType =>
+new IntegerWriter(vector.asInstanceOf[NullableIntVector])
+  case LongType =>
+new LongWriter(vector.asInstanceOf[NullableBigIntVector])
+  case FloatType =>
+new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
+  case DoubleType =>
+new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
+  case DecimalType.Fixed(precision, scale) =>
+new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], 
precision, scale)
+  case StringType =>
+new StringWriter(vector.asInstanceOf[NullableVarCharVector])
+  case BinaryType =>
+new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
+  case ArrayType(_, _) =>
+val v = vector.asInstanceOf[ListVector]
+val elementVector = createFieldWriter(v.getDataVector())
+new ArrayWriter(v, elementVector)
+  case StructType(_) =>
+val v = vector.asInstanceOf[NullableMapVector]
+val children = (0 until v.size()).map { ordinal =>
+  createFieldWriter(v.getChildByOrdinal(ordinal))
+}
+new StructWriter(v, children.toArray)
+}
+  }
+}
+
+class ArrowWriter(
+val root: VectorSchemaRoot,
+fields: Array[ArrowFieldWriter]) {
+
+  def schema: StructType = StructType(fields.map { f =>
+StructField(f.name, f.dataType, f.nullable)
+  })
+
+  private var count: Int = 0
+
+  def write(row: InternalRow): Unit = {
+var i = 0
+while (i < fields.size) {
+  fields(i).write(row, i)
+  i += 1
+}
+count += 1
+  }
+
+  def finish(): Unit = {
+root.setRowCount(count)
+fields.foreach(_.finish())
+  }
+
+  def reset(): Unit = {
+root.setRowCount(0)
+count = 0
+fields.foreach(_.reset())
+  }
+}
+
+private[sql] abstract class ArrowFieldWriter {
+
+  def valueVector: ValueVector
+  def valueMutator: ValueVector.Mutator
+
+  def name: String = valueVector.getField().getName()
+  def dataType: DataType = 
ArrowUtils.fromArrowField(valueVector.getField())
+  def nullable: Boolean = valueVector.getField().isNullable()
+
+  def setNull(): Unit
  

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128146843
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128064603
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 collectAndValidate(df, json, "floating_point-double_precision.json")
   }
 
+  ignore("decimal conversion") {
--- End diff --

Why ignore this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128064385
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala
 ---
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.complex._
+import org.apache.arrow.vector.util.DecimalUtility
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types._
+
+object ArrowWriter {
+
+  def create(schema: StructType): ArrowWriter = {
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val root = VectorSchemaRoot.create(arrowSchema, 
ArrowUtils.rootAllocator)
+create(root)
+  }
+
+  def create(root: VectorSchemaRoot): ArrowWriter = {
+val children = root.getFieldVectors().asScala.map { vector =>
+  vector.allocateNew()
+  createFieldWriter(vector)
+}
+new ArrowWriter(root, children.toArray)
+  }
+
+  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+val field = vector.getField()
+ArrowUtils.fromArrowField(field) match {
+  case BooleanType =>
+new BooleanWriter(vector.asInstanceOf[NullableBitVector])
+  case ByteType =>
+new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
+  case ShortType =>
+new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
+  case IntegerType =>
+new IntegerWriter(vector.asInstanceOf[NullableIntVector])
+  case LongType =>
+new LongWriter(vector.asInstanceOf[NullableBigIntVector])
+  case FloatType =>
+new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
+  case DoubleType =>
+new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
+  case DecimalType.Fixed(precision, scale) =>
+new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], 
precision, scale)
+  case StringType =>
+new StringWriter(vector.asInstanceOf[NullableVarCharVector])
+  case BinaryType =>
+new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
+  case ArrayType(_, _) =>
+val v = vector.asInstanceOf[ListVector]
+val elementVector = createFieldWriter(v.getDataVector())
+new ArrayWriter(v, elementVector)
+  case StructType(_) =>
+val v = vector.asInstanceOf[NullableMapVector]
+val children = (0 until v.size()).map { ordinal =>
+  createFieldWriter(v.getChildByOrdinal(ordinal))
+}
+new StructWriter(v, children.toArray)
+}
+  }
+}
+
+class ArrowWriter(
+val root: VectorSchemaRoot,
+fields: Array[ArrowFieldWriter]) {
+
+  def schema: StructType = StructType(fields.map { f =>
+StructField(f.name, f.dataType, f.nullable)
+  })
+
+  private var count: Int = 0
+
+  def write(row: InternalRow): Unit = {
+var i = 0
+while (i < fields.size) {
+  fields(i).write(row, i)
+  i += 1
+}
+count += 1
+  }
+
+  def finish(): Unit = {
+root.setRowCount(count)
+fields.foreach(_.finish())
+  }
+
+  def reset(): Unit = {
+root.setRowCount(0)
+count = 0
+fields.foreach(_.reset())
+  }
+}
+
+private[sql] abstract class ArrowFieldWriter {
+
+  def valueVector: ValueVector
+  def valueMutator: ValueVector.Mutator
+
+  def name: String = valueVector.getField().getName()
+  def dataType: DataType = 
ArrowUtils.fromArrowField(valueVector.getField())
+  def nullable: Boolean = valueVector.getField().isNullable()
+
+  def setNull(): 

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128062715
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128061312
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r128061219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] 
(payload: Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
-private[sql] object ArrowPayload {
-
-  /**
-   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
-   */
-  def apply(
-  batch: ArrowRecordBatch,
-  schema: StructType,
-  allocator: BufferAllocator): ArrowPayload = {
-new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, 
allocator))
-  }
-}
-
 private[sql] object ArrowConverters {
 
   /**
-   * Map a Spark DataType to ArrowType.
-   */
-  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
-dataType match {
-  case BooleanType => ArrowType.Bool.INSTANCE
-  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
-  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
-  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
-  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-  case ByteType => new ArrowType.Int(8, true)
-  case StringType => ArrowType.Utf8.INSTANCE
-  case BinaryType => ArrowType.Binary.INSTANCE
-  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
-}
-  }
-
-  /**
-   * Convert a Spark Dataset schema to Arrow schema.
-   */
-  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
-val arrowFields = schema.fields.map { f =>
-  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
-}
-new Schema(arrowFields.toList.asJava)
-  }
-
-  /**
* Maps Iterator from InternalRow to ArrowPayload. Limit 
ArrowRecordBatch size in ArrowPayload
* by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
*/
   private[sql] def toPayloadIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
-new Iterator[ArrowPayload] {
-  private val _allocator = new RootAllocator(Long.MaxValue)
-  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+  maxRecordsPerBatch: Int,
+  context: TaskContext): Iterator[ArrowPayload] = {
 
-  override def hasNext: Boolean = _nextPayload != null
+val arrowSchema = ArrowUtils.toArrowSchema(schema)
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, 
Long.MaxValue)
 
-  override def next(): ArrowPayload = {
-val obj = _nextPayload
-if (hasNext) {
-  if (rowIter.hasNext) {
-_nextPayload = convert()
-  } else {
-_allocator.close()
-_nextPayload = null
-  }
-}
-obj
-  }
-
-  private def convert(): ArrowPayload = {
-val batch = internalRowIterToArrowBatch(rowIter, schema, 
_allocator, maxRecordsPerBatch)
-ArrowPayload(batch, schema, _allocator)
-  }
-}
-  }
-
-  /**
-   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping 
when rowIter is consumed
-   * or the number of records in the batch equals maxRecordsInBatch.  If 
maxRecordsPerBatch is 0,
-   * then rowIter will be fully consumed.
-   */
-  private def internalRowIterToArrowBatch(
-  rowIter: Iterator[InternalRow],
-  schema: StructType,
-  allocator: BufferAllocator,
-  maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
-
-val columnWriters = schema.fields.zipWithIndex.map { case (field, 
ordinal) =>
-  ColumnWriter(field.dataType, ordinal, allocator).init()
-}
+val root = VectorSchemaRoot.create(arrowSchema, allocator)
+val arrowWriter = ArrowWriter.create(root)
 
-val writerLength = columnWriters.length
-var recordsInBatch = 0
-while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < 
maxRecordsPerBatch)) {
-  val row = rowIter.next()
-  var i = 0
-  while (i < writerLength) {
-columnWriters(i).write(row)
-i += 1
-  }
-  recordsInBatch += 1
+context.addTaskCompletionListener { _ =>
+  root.close()
+  allocator.close()

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-17 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r127734562
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column backed by Apache Arrow.
+ */
+public final class ArrowColumnVector extends ColumnVector {
+
+  private ValueVector vector;
+  private ValueVector.Accessor nulls;
+
+  private NullableBitVector boolData;
+  private NullableTinyIntVector byteData;
+  private NullableSmallIntVector shortData;
+  private NullableIntVector intData;
+  private NullableBigIntVector longData;
+
+  private NullableFloat4Vector floatData;
+  private NullableFloat8Vector doubleData;
+  private NullableDecimalVector decimalData;
+
+  private NullableVarCharVector stringData;
+
+  private NullableVarBinaryVector binaryData;
+
+  private UInt4Vector listOffsetData;
+
+  public ArrowColumnVector(ValueVector vector) {
+super(vector.getValueCapacity(), DataTypes.NullType, 
MemoryMode.OFF_HEAP);
+initialize(vector);
+  }
+
+  @Override
+  public long nullsNativeAddress() {
+throw new RuntimeException("Cannot get native address for arrow 
column");
+  }
+
+  @Override
+  public long valuesNativeAddress() {
+throw new RuntimeException("Cannot get native address for arrow 
column");
+  }
+
+  @Override
+  public void close() {
+if (childColumns != null) {
+  for (int i = 0; i < childColumns.length; i++) {
+childColumns[i].close();
+  }
+}
+vector.close();
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public void putNotNull(int rowId) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNull(int rowId) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNulls(int rowId, int count) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNotNulls(int rowId, int count) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+return nulls.isNull(rowId);
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public void putBoolean(int rowId, boolean value) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putBooleans(int rowId, int count, boolean value) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+return boolData.getAccessor().get(rowId) == 1;
--- End diff --

Can we use `nulls`? If so, it would be better to use another name instead 
of `nulls`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-17 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18655#discussion_r127704333
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column backed by Apache Arrow.
+ */
+public final class ArrowColumnVector extends ColumnVector {
+
+  private ValueVector vector;
+  private ValueVector.Accessor nulls;
+
+  private NullableBitVector boolData;
+  private NullableTinyIntVector byteData;
+  private NullableSmallIntVector shortData;
+  private NullableIntVector intData;
+  private NullableBigIntVector longData;
+
+  private NullableFloat4Vector floatData;
+  private NullableFloat8Vector doubleData;
+  private NullableDecimalVector decimalData;
+
+  private NullableVarCharVector stringData;
+
+  private NullableVarBinaryVector binaryData;
+
+  private UInt4Vector listOffsetData;
+
+  public ArrowColumnVector(ValueVector vector) {
+super(vector.getValueCapacity(), DataTypes.NullType, 
MemoryMode.OFF_HEAP);
+initialize(vector);
+  }
+
+  @Override
+  public long nullsNativeAddress() {
+throw new RuntimeException("Cannot get native address for arrow 
column");
+  }
+
+  @Override
+  public long valuesNativeAddress() {
+throw new RuntimeException("Cannot get native address for arrow 
column");
+  }
+
+  @Override
+  public void close() {
+if (childColumns != null) {
+  for (int i = 0; i < childColumns.length; i++) {
+childColumns[i].close();
+  }
+}
+vector.close();
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public void putNotNull(int rowId) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNull(int rowId) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNulls(int rowId, int count) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putNotNulls(int rowId, int count) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+return nulls.isNull(rowId);
+  }
+
+  //
+  // APIs dealing with Booleans
+  //
+
+  @Override
+  public void putBoolean(int rowId, boolean value) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void putBooleans(int rowId, int count, boolean value) {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+return boolData.getAccessor().get(rowId) == 1;
+  }
+
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+assert(dictionary == null);
+boolean[] array = new boolean[count];
+for (int i = 0; i < count; ++i) {
+  array[i] = (boolData.getAccessor().get(rowId + i) == 1);
--- End diff --

Can we move `boolData.getAccessor()` out of the loop if it is a loop 
invariant?
Ditto for other types (e.g. `getBytes()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

2017-07-17 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/18655

[SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add DecimalType, 
ArrayType and StructType support.

## What changes were proposed in this pull request?

This is a refactoring of `ArrowConverters` and related classes.

1. Introduce `ArrowColumnVector` to read Arrow data in Java.
2. Refactor `ColumnWriter` as `ArrowWriter`.
3. Add `DecimalType`, `ArrayType` and `StructType` support.
4. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` 
creation.

## How was this patch tested?

Added some tests and existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-21440

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18655


commit 1439fe6d320208ccb565d18fc4b7485210068330
Author: Takuya UESHIN 
Date:   2017-07-13T08:45:33Z

Introduce ArrowWriter and ArrowColumnVector.

commit 6fcf700d381a18704b3ee485083ee51c82bbd2f8
Author: Takuya UESHIN 
Date:   2017-07-14T05:48:33Z

Use ArrowWriter for ArrowConverters.

commit 579def2db0a0f015760a458032d3bd916669201c
Author: Takuya UESHIN 
Date:   2017-07-14T09:42:32Z

Refactor ArrowConverters.

commit 58cd46506b02800269380f7c8acb5f9825664cad
Author: Takuya UESHIN 
Date:   2017-07-17T06:30:17Z

Move releasing memory into task completion listener.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org