[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18655 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129735129 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -77,95 +59,55 @@ private[sql] object ArrowConverters { private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +var closed = false -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 +context.addTaskCompletionListener { _ => + if (!closed) { --- End diff -- I filed https://issues.apache.org/jira/browse/ARROW-1283 to fix this. For now, it looks like we need this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129488362 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "nanData-floating_point.json") } + test("array type conversion") { +val json = + s""" + |{ + | "schema" : { + |"fields" : [ { + | "name" : "a_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : false, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "b_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : false, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "c_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : true, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "d_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : true, + |"type" : { + | "name" : "list" + |}, + |"children" : [ { + | "name" : "element", + | "nullable" : false, + | "type" : { + |"name" : "int", + |
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129487716 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { --- End diff -- Thanks, I'll modify it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129486273 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.execution.vectorized.ArrowColumnVector +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class ArrowWriterSuite extends SparkFunSuite { + + test("simple") { +def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) => Any): Unit = { + val schema = new StructType().add("value", dt, nullable = true) + val writer = ArrowWriter.create(schema) + assert(writer.schema === schema) + + data.foreach { datum => +writer.write(InternalRow(datum)) + } + writer.finish() + + val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0)) + data.zipWithIndex.foreach { +case (null, rowId) => assert(reader.isNullAt(rowId)) +case (datum, rowId) => assert(get(reader, rowId) === datum) --- End diff -- Thanks, I'll update it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129486171 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { + case BooleanType => +new BooleanWriter(vector.asInstanceOf[NullableBitVector]) + case ByteType => +new ByteWriter(vector.asInstanceOf[NullableTinyIntVector]) + case ShortType => +new ShortWriter(vector.asInstanceOf[NullableSmallIntVector]) + case IntegerType => +new IntegerWriter(vector.asInstanceOf[NullableIntVector]) + case LongType => +new LongWriter(vector.asInstanceOf[NullableBigIntVector]) + case FloatType => +new FloatWriter(vector.asInstanceOf[NullableFloat4Vector]) + case DoubleType => +new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector]) + case StringType => +new StringWriter(vector.asInstanceOf[NullableVarCharVector]) + case BinaryType => +new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector]) + case ArrayType(_, _) => +val v = vector.asInstanceOf[ListVector] +val elementVector = createFieldWriter(v.getDataVector()) +new ArrayWriter(v, elementVector) + case StructType(_) => +val v = vector.asInstanceOf[NullableMapVector] +val children = (0 until v.size()).map { ordinal => + createFieldWriter(v.getChildByOrdinal(ordinal)) +} +new StructWriter(v, children.toArray) + case dt => +throw new UnsupportedOperationException(s"Unsupported data type: ${dt.simpleString}") +} + } +} + +class ArrowWriter( +val root: VectorSchemaRoot, +fields: Array[ArrowFieldWriter]) { + + def schema: StructType = StructType(fields.map { f => +StructField(f.name, f.dataType, f.nullable) + }) + + private var count: Int = 0 + + def write(row: InternalRow): Unit = { +var i = 0 +while (i < fields.size) { + fields(i).write(row, i) + i += 1 +} +count += 1 + } + + def finish(): Unit = { +root.setRowCount(count) +fields.foreach(_.finish()) + } + + def reset(): Unit = { +root.setRowCount(0) +count = 0 +fields.foreach(_.reset()) + } +} + +private[arrow] abstract class ArrowFieldWriter { + + def valueVector: ValueVector + def valueMutator: ValueVector.Mutator + + def name: String = valueVector.getField().getName() + def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField()) + def nullable: Boolean = valueVector.getField().isNullable() + + def setNull(): Unit + def setValue(input:
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129484715 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -77,95 +59,55 @@ private[sql] object ArrowConverters { private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +var closed = false -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 +context.addTaskCompletionListener { _ => + if (!closed) { --- End diff -- The root just releases the buffers from the FieldVectors, so I would think it should be able to handle being closed twice. I'll check tomorrow if seems reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129482346 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "nanData-floating_point.json") } + test("array type conversion") { +val json = + s""" + |{ + | "schema" : { + |"fields" : [ { + | "name" : "a_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : false, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "b_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : false, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "c_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : true, + |"type" : { + | "name" : "int", + | "bitWidth" : 32, + | "isSigned" : true + |}, + |"children" : [ ], + |"typeLayout" : { + | "vectors" : [ { + |"type" : "VALIDITY", + |"typeBitWidth" : 1 + | }, { + |"type" : "DATA", + |"typeBitWidth" : 32 + | } ] + |} + | } ], + | "typeLayout" : { + |"vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + |}, { + | "type" : "OFFSET", + | "typeBitWidth" : 32 + |} ] + | } + |}, { + | "name" : "d_arr", + | "nullable" : true, + | "type" : { + |"name" : "list" + | }, + | "children" : [ { + |"name" : "element", + |"nullable" : true, + |"type" : { + | "name" : "list" + |}, + |"children" : [ { + | "name" : "element", + | "nullable" : false, + | "type" : { + |"name" : "int", + |
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129481638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { --- End diff -- Would it be better to do as below? ```scala (ArrowUtils.fromArrowField(field), vector) match { case (_: BooleanType, vector: NullableBitVector) => new BooleanWriter(vector) case (_: ByteType, vector: NullableTinyIntVector) => new ByteWriter(vector) ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129480466 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.execution.vectorized.ArrowColumnVector +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class ArrowWriterSuite extends SparkFunSuite { + + test("simple") { +def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) => Any): Unit = { + val schema = new StructType().add("value", dt, nullable = true) + val writer = ArrowWriter.create(schema) + assert(writer.schema === schema) + + data.foreach { datum => +writer.write(InternalRow(datum)) + } + writer.finish() + + val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0)) + data.zipWithIndex.foreach { +case (null, rowId) => assert(reader.isNullAt(rowId)) +case (datum, rowId) => assert(get(reader, rowId) === datum) --- End diff -- we can do something like ``` dt match { case BooleanType => reader.getBoolean(rowid) case IntegerType => ... ... } ``` Then the caller side doesn't need to pass in a `get` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129480291 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { + case BooleanType => +new BooleanWriter(vector.asInstanceOf[NullableBitVector]) + case ByteType => +new ByteWriter(vector.asInstanceOf[NullableTinyIntVector]) + case ShortType => +new ShortWriter(vector.asInstanceOf[NullableSmallIntVector]) + case IntegerType => +new IntegerWriter(vector.asInstanceOf[NullableIntVector]) + case LongType => +new LongWriter(vector.asInstanceOf[NullableBigIntVector]) + case FloatType => +new FloatWriter(vector.asInstanceOf[NullableFloat4Vector]) + case DoubleType => +new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector]) + case StringType => +new StringWriter(vector.asInstanceOf[NullableVarCharVector]) + case BinaryType => +new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector]) + case ArrayType(_, _) => +val v = vector.asInstanceOf[ListVector] +val elementVector = createFieldWriter(v.getDataVector()) +new ArrayWriter(v, elementVector) + case StructType(_) => +val v = vector.asInstanceOf[NullableMapVector] +val children = (0 until v.size()).map { ordinal => + createFieldWriter(v.getChildByOrdinal(ordinal)) +} +new StructWriter(v, children.toArray) + case dt => +throw new UnsupportedOperationException(s"Unsupported data type: ${dt.simpleString}") +} + } +} + +class ArrowWriter( +val root: VectorSchemaRoot, +fields: Array[ArrowFieldWriter]) { + + def schema: StructType = StructType(fields.map { f => +StructField(f.name, f.dataType, f.nullable) + }) + + private var count: Int = 0 + + def write(row: InternalRow): Unit = { +var i = 0 +while (i < fields.size) { + fields(i).write(row, i) + i += 1 +} +count += 1 + } + + def finish(): Unit = { +root.setRowCount(count) +fields.foreach(_.finish()) + } + + def reset(): Unit = { +root.setRowCount(0) +count = 0 +fields.foreach(_.reset()) + } +} + +private[arrow] abstract class ArrowFieldWriter { + + def valueVector: ValueVector + def valueMutator: ValueVector.Mutator + + def name: String = valueVector.getField().getName() + def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField()) + def nullable: Boolean = valueVector.getField().isNullable() + + def setNull(): Unit + def setValue(input:
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129480076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -77,95 +59,55 @@ private[sql] object ArrowConverters { private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +var closed = false -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 +context.addTaskCompletionListener { _ => + if (!closed) { --- End diff -- is this a bug in arrow? cc @BryanCutler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129477483 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -77,95 +59,55 @@ private[sql] object ArrowConverters { private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +var closed = false -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 +context.addTaskCompletionListener { _ => + if (!closed) { --- End diff -- The `allocator` can be closed twice, but the `root` throws an exception after `allocator` is closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r129477058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -77,95 +59,55 @@ private[sql] object ArrowConverters { private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +var closed = false -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 +context.addTaskCompletionListener { _ => + if (!closed) { --- End diff -- do we really need this? I think it's ok to close twice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128598428 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "floating_point-double_precision.json") } + ignore("decimal conversion") { --- End diff -- Arrow integration support for DecimalType isn't slated until v0.6, so it might work but there are no guarantees that a record batch in Java will equal when that batch is read by Python/C++. Also, we can't test here until the `JsonFileReader` supports it also. I made the Arrow JIRA here https://issues.apache.org/jira/browse/ARROW-1238 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128313325 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "floating_point-double_precision.json") } + ignore("decimal conversion") { --- End diff -- That might be true, I haven't looked into it yet. I can work on adding support on the Arrow side, so I'll try to check on that and see where it stands in the upcoming 0.5 release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128312860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146875 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "floating_point-double_precision.json") } + ignore("decimal conversion") { --- End diff -- Oh, I'm sorry, I should have mentioned it. It seems like `JsonFileReader` doesn't support DecimalType, so I ignored it for now. But now I'm thinking that If Arrow 0.4.0 has a bug for the decimal type as you said, should I remove decimal type support from this pr and add support in the following prs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146851 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { + case BooleanType => +new BooleanWriter(vector.asInstanceOf[NullableBitVector]) + case ByteType => +new ByteWriter(vector.asInstanceOf[NullableTinyIntVector]) + case ShortType => +new ShortWriter(vector.asInstanceOf[NullableSmallIntVector]) + case IntegerType => +new IntegerWriter(vector.asInstanceOf[NullableIntVector]) + case LongType => +new LongWriter(vector.asInstanceOf[NullableBigIntVector]) + case FloatType => +new FloatWriter(vector.asInstanceOf[NullableFloat4Vector]) + case DoubleType => +new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector]) + case DecimalType.Fixed(precision, scale) => +new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], precision, scale) + case StringType => +new StringWriter(vector.asInstanceOf[NullableVarCharVector]) + case BinaryType => +new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector]) + case ArrayType(_, _) => +val v = vector.asInstanceOf[ListVector] +val elementVector = createFieldWriter(v.getDataVector()) +new ArrayWriter(v, elementVector) + case StructType(_) => +val v = vector.asInstanceOf[NullableMapVector] +val children = (0 until v.size()).map { ordinal => + createFieldWriter(v.getChildByOrdinal(ordinal)) +} +new StructWriter(v, children.toArray) +} + } +} + +class ArrowWriter( +val root: VectorSchemaRoot, +fields: Array[ArrowFieldWriter]) { + + def schema: StructType = StructType(fields.map { f => +StructField(f.name, f.dataType, f.nullable) + }) + + private var count: Int = 0 + + def write(row: InternalRow): Unit = { +var i = 0 +while (i < fields.size) { + fields(i).write(row, i) + i += 1 +} +count += 1 + } + + def finish(): Unit = { +root.setRowCount(count) +fields.foreach(_.finish()) + } + + def reset(): Unit = { +root.setRowCount(0) +count = 0 +fields.foreach(_.reset()) + } +} + +private[sql] abstract class ArrowFieldWriter { + + def valueVector: ValueVector + def valueMutator: ValueVector.Mutator + + def name: String = valueVector.getField().getName() + def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField()) + def nullable: Boolean = valueVector.getField().isNullable() + + def setNull(): Unit
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128146843 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128064603 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "floating_point-double_precision.json") } + ignore("decimal conversion") { --- End diff -- Why ignore this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128064385 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.util.DecimalUtility + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.types._ + +object ArrowWriter { + + def create(schema: StructType): ArrowWriter = { +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) +create(root) + } + + def create(root: VectorSchemaRoot): ArrowWriter = { +val children = root.getFieldVectors().asScala.map { vector => + vector.allocateNew() + createFieldWriter(vector) +} +new ArrowWriter(root, children.toArray) + } + + private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = { +val field = vector.getField() +ArrowUtils.fromArrowField(field) match { + case BooleanType => +new BooleanWriter(vector.asInstanceOf[NullableBitVector]) + case ByteType => +new ByteWriter(vector.asInstanceOf[NullableTinyIntVector]) + case ShortType => +new ShortWriter(vector.asInstanceOf[NullableSmallIntVector]) + case IntegerType => +new IntegerWriter(vector.asInstanceOf[NullableIntVector]) + case LongType => +new LongWriter(vector.asInstanceOf[NullableBigIntVector]) + case FloatType => +new FloatWriter(vector.asInstanceOf[NullableFloat4Vector]) + case DoubleType => +new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector]) + case DecimalType.Fixed(precision, scale) => +new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], precision, scale) + case StringType => +new StringWriter(vector.asInstanceOf[NullableVarCharVector]) + case BinaryType => +new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector]) + case ArrayType(_, _) => +val v = vector.asInstanceOf[ListVector] +val elementVector = createFieldWriter(v.getDataVector()) +new ArrayWriter(v, elementVector) + case StructType(_) => +val v = vector.asInstanceOf[NullableMapVector] +val children = (0 until v.size()).map { ordinal => + createFieldWriter(v.getChildByOrdinal(ordinal)) +} +new StructWriter(v, children.toArray) +} + } +} + +class ArrowWriter( +val root: VectorSchemaRoot, +fields: Array[ArrowFieldWriter]) { + + def schema: StructType = StructType(fields.map { f => +StructField(f.name, f.dataType, f.nullable) + }) + + private var count: Int = 0 + + def write(row: InternalRow): Unit = { +var i = 0 +while (i < fields.size) { + fields(i).write(row, i) + i += 1 +} +count += 1 + } + + def finish(): Unit = { +root.setRowCount(count) +fields.foreach(_.finish()) + } + + def reset(): Unit = { +root.setRowCount(0) +count = 0 +fields.foreach(_.reset()) + } +} + +private[sql] abstract class ArrowFieldWriter { + + def valueVector: ValueVector + def valueMutator: ValueVector.Mutator + + def name: String = valueVector.getField().getName() + def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField()) + def nullable: Boolean = valueVector.getField().isNullable() + + def setNull():
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128062715 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128061312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r128061219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { -new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - private[sql] object ArrowConverters { /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { -dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") -} - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { -val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) -} -new Schema(arrowFields.toList.asJava) - } - - /** * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. */ private[sql] def toPayloadIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { -new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null + maxRecordsPerBatch: Int, + context: TaskContext): Iterator[ArrowPayload] = { - override def hasNext: Boolean = _nextPayload != null +val arrowSchema = ArrowUtils.toArrowSchema(schema) +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) - override def next(): ArrowPayload = { -val obj = _nextPayload -if (hasNext) { - if (rowIter.hasNext) { -_nextPayload = convert() - } else { -_allocator.close() -_nextPayload = null - } -} -obj - } - - private def convert(): ArrowPayload = { -val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) -ArrowPayload(batch, schema, _allocator) - } -} - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - -val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() -} +val root = VectorSchemaRoot.create(arrowSchema, allocator) +val arrowWriter = ArrowWriter.create(root) -val writerLength = columnWriters.length -var recordsInBatch = 0 -while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { -columnWriters(i).write(row) -i += 1 - } - recordsInBatch += 1 +context.addTaskCompletionListener { _ => + root.close() + allocator.close()
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r127734562 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import org.apache.arrow.vector.*; +import org.apache.arrow.vector.complex.*; +import org.apache.arrow.vector.holders.NullableVarCharHolder; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by Apache Arrow. + */ +public final class ArrowColumnVector extends ColumnVector { + + private ValueVector vector; + private ValueVector.Accessor nulls; + + private NullableBitVector boolData; + private NullableTinyIntVector byteData; + private NullableSmallIntVector shortData; + private NullableIntVector intData; + private NullableBigIntVector longData; + + private NullableFloat4Vector floatData; + private NullableFloat8Vector doubleData; + private NullableDecimalVector decimalData; + + private NullableVarCharVector stringData; + + private NullableVarBinaryVector binaryData; + + private UInt4Vector listOffsetData; + + public ArrowColumnVector(ValueVector vector) { +super(vector.getValueCapacity(), DataTypes.NullType, MemoryMode.OFF_HEAP); +initialize(vector); + } + + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for arrow column"); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for arrow column"); + } + + @Override + public void close() { +if (childColumns != null) { + for (int i = 0; i < childColumns.length; i++) { +childColumns[i].close(); + } +} +vector.close(); + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNotNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int rowId) { +return nulls.isNull(rowId); + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int rowId) { +return boolData.getAccessor().get(rowId) == 1; --- End diff -- Can we use `nulls`? If so, it would be better to use another name instead of `nulls`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18655#discussion_r127704333 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import org.apache.arrow.vector.*; +import org.apache.arrow.vector.complex.*; +import org.apache.arrow.vector.holders.NullableVarCharHolder; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column backed by Apache Arrow. + */ +public final class ArrowColumnVector extends ColumnVector { + + private ValueVector vector; + private ValueVector.Accessor nulls; + + private NullableBitVector boolData; + private NullableTinyIntVector byteData; + private NullableSmallIntVector shortData; + private NullableIntVector intData; + private NullableBigIntVector longData; + + private NullableFloat4Vector floatData; + private NullableFloat8Vector doubleData; + private NullableDecimalVector decimalData; + + private NullableVarCharVector stringData; + + private NullableVarBinaryVector binaryData; + + private UInt4Vector listOffsetData; + + public ArrowColumnVector(ValueVector vector) { +super(vector.getValueCapacity(), DataTypes.NullType, MemoryMode.OFF_HEAP); +initialize(vector); + } + + @Override + public long nullsNativeAddress() { +throw new RuntimeException("Cannot get native address for arrow column"); + } + + @Override + public long valuesNativeAddress() { +throw new RuntimeException("Cannot get native address for arrow column"); + } + + @Override + public void close() { +if (childColumns != null) { + for (int i = 0; i < childColumns.length; i++) { +childColumns[i].close(); + } +} +vector.close(); + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNull(int rowId) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public void putNotNulls(int rowId, int count) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int rowId) { +return nulls.isNull(rowId); + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { +throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int rowId) { +return boolData.getAccessor().get(rowId) == 1; + } + + @Override + public boolean[] getBooleans(int rowId, int count) { +assert(dictionary == null); +boolean[] array = new boolean[count]; +for (int i = 0; i < count; ++i) { + array[i] = (boolData.getAccessor().get(rowId + i) == 1); --- End diff -- Can we move `boolData.getAccessor()` out of the loop if it is a loop invariant? Ditto for other types (e.g. `getBytes()`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/18655 [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add DecimalType, ArrayType and StructType support. ## What changes were proposed in this pull request? This is a refactoring of `ArrowConverters` and related classes. 1. Introduce `ArrowColumnVector` to read Arrow data in Java. 2. Refactor `ColumnWriter` as `ArrowWriter`. 3. Add `DecimalType`, `ArrayType` and `StructType` support. 4. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation. ## How was this patch tested? Added some tests and existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-21440 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18655 commit 1439fe6d320208ccb565d18fc4b7485210068330 Author: Takuya UESHINDate: 2017-07-13T08:45:33Z Introduce ArrowWriter and ArrowColumnVector. commit 6fcf700d381a18704b3ee485083ee51c82bbd2f8 Author: Takuya UESHIN Date: 2017-07-14T05:48:33Z Use ArrowWriter for ArrowConverters. commit 579def2db0a0f015760a458032d3bd916669201c Author: Takuya UESHIN Date: 2017-07-14T09:42:32Z Refactor ArrowConverters. commit 58cd46506b02800269380f7c8acb5f9825664cad Author: Takuya UESHIN Date: 2017-07-17T06:30:17Z Move releasing memory into task completion listener. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org