[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136413085 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- I just made #19098 to remove this check - it's not really testing the functionality added here anyway but maybe another test should be added for checkout index out of bounds errors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136409601 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- I think the problem is that if the Java assertion is compiled out, then no error is produce and the test fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136408878 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- Maybe? ```scala val m = intercept[java.lang.AssertionError] { ... }.getMessage assert(m.contains(...)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136408531 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- Then, please check the error message here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136408063 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- It's probably because the assert is being compiled out.. This should probably not be in the test then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136407451 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- Thanks! It seems to happen Maven only. sbt-hadoop-2.6 passed. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/3480/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136406559 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- Hmm, that is strange. I'll take a look, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r136404583 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create columnar batch from Arrow column vectors") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) --- End diff -- Hi, @BryanCutler and @ueshin . This seems to make master branch fail. Could you take a look once more? Thank you in advance! - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/3696/testReport/ - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/3730/testReport/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18787 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135952994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -111,6 +125,66 @@ private[sql] object ArrowConverters { } /** + * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing the row iterator + * and the schema from the first batch of Arrow data read. + */ + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + context: TaskContext): ArrowRowIterator = { +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) + +new ArrowRowIterator { + private var reader: ArrowFileReader = null + private var schemaRead = StructType(Seq.empty) + private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty --- End diff -- nvm, I thought the first call of `hasNext` would initialize it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135952996 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector.allocateNew() +val mutator = vector.getMutator() + +(0 until 10).foreach { i => + mutator.setSafe(i, i) +} +mutator.setNull(10) +mutator.setValueCount(11) + +val schema = StructType(Seq(StructField("int", IntegerType))) + +val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) --- End diff -- Yes, I meant your second comment. We do test the columnar batch with `ArrowColumnVector` in `ColumnarBatchSuite` and also we use it in `ArrowConverters.fromPayloadIterator()`, so I thought we don't need to use it here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135864769 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector.allocateNew() +val mutator = vector.getMutator() + +(0 until 10).foreach { i => + mutator.setSafe(i, i) +} +mutator.setNull(10) +mutator.setValueCount(11) + +val schema = StructType(Seq(StructField("int", IntegerType))) + +val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) --- End diff -- Oh, you mean not using `ArrowColumnVector` at all and just make an `Iterator[InternalRow]` some other way? That would probably work, but I figured why not test out the columnar batch this way also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135863376 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector.allocateNew() +val mutator = vector.getMutator() + +(0 until 10).foreach { i => + mutator.setSafe(i, i) +} +mutator.setNull(10) +mutator.setValueCount(11) + +val schema = StructType(Seq(StructField("int", IntegerType))) + +val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) --- End diff -- you mean just calling something like `new ColumnarBatch(..).rowIterator()`? We still need to set the number of rows in the batch I believe --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135862583 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] --- End diff -- yes, thanks for catching that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135861235 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -111,6 +125,66 @@ private[sql] object ArrowConverters { } /** + * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing the row iterator + * and the schema from the first batch of Arrow data read. + */ + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + context: TaskContext): ArrowRowIterator = { +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) + +new ArrowRowIterator { + private var reader: ArrowFileReader = null + private var schemaRead = StructType(Seq.empty) + private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty --- End diff -- `nextBatch()` returns the row iterator, so `rowIter` needs to be initialized here to the first row in the first batch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135439310 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create read-only batch") { --- End diff -- `create a columnar batch from Arrow column vectors` or something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135439372 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala --- @@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create read-only batch") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector1.allocateNew() +val mutator1 = vector1.getMutator() +val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector2.allocateNew() +val mutator2 = vector2.getMutator() + +(0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) +} +mutator1.setNull(10) +mutator1.setValueCount(11) +mutator2.setNull(0) +mutator2.setValueCount(11) + +val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + +val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) +val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) +batch.setNumRows(11) + +assert(batch.numCols() == 2) +assert(batch.numRows() == 11) + +val rowIter = batch.rowIterator().asScala +rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { +assert(row.isNullAt(0)) + } else { +assert(row.getInt(0) == i) + } + if (i == 0) { +assert(row.isNullAt(1)) + } else { +assert(row.getInt(1) == i - 1) + } +} + +intercept[java.lang.AssertionError] { + batch.getRow(100) +} + +columnVectors.foreach(_.close()) --- End diff -- We can use `batch.close()` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135439793 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] +vector.allocateNew() +val mutator = vector.getMutator() + +(0 until 10).foreach { i => + mutator.setSafe(i, i) +} +mutator.setNull(10) +mutator.setValueCount(11) + +val schema = StructType(Seq(StructField("int", IntegerType))) + +val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) --- End diff -- Btw, do we need to use `ColumnarBatch` for this test? I guess we can simply create `Iterator[InternalRow]` and use it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135438683 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -111,6 +125,66 @@ private[sql] object ArrowConverters { } /** + * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing the row iterator + * and the schema from the first batch of Arrow data read. + */ + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + context: TaskContext): ArrowRowIterator = { +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) + +new ArrowRowIterator { + private var reader: ArrowFileReader = null + private var schemaRead = StructType(Seq.empty) + private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty --- End diff -- We can simply put `Iterator.empty` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r135439857 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { +val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) +val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] --- End diff -- Should the `allocator` and the `vector` be closed at the end of this test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132252558 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); --- End diff -- The max `capacity` only has meaning when allocating `ColumnVectors` so it doesn't really do anything for read-only vectors. You need to call`setNumRows` to tell the batch how many rows there for the given columns, it doesn't look at the capacity in the individual vectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132246475 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); +batch.setNumRows(numRows); --- End diff -- The `ArrowColumnVector.valueCount` [here](https://github.com/BryanCutler/spark/blob/23d19dfde53d02c37a2c20f67c9816a73bd57cd2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java#L35) would need to be moved to `ReadOnlyColumnVector` which could go in place of the capacity. If @ueshin thinks that's ok to do so here, I can add that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132188490 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); --- End diff -- Why the capacity is set to `numRows` inside the ctor but need to call `batch.setNumRows()` manually? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132187027 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); +batch.setNumRows(numRows); --- End diff -- Do we need to check each ReadOnlyColumnVector has numRows? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r131493567 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -110,6 +113,67 @@ private[sql] object ArrowConverters { } } + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + schema: StructType, + context: TaskContext): Iterator[InternalRow] = { + +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) +var reader: ArrowFileReader = null + +new Iterator[InternalRow] { + + context.addTaskCompletionListener { _ => +close() + } + + private var _batch: ColumnarBatch = _ --- End diff -- TODO: not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130680847 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +for (ReadOnlyColumnVector c: columns) { + assert(c.capacity >= numRows); +} +ColumnarBatch batch = create(schema, columns, numRows); +batch.setNumRows(numRows); +return batch; + } + + private static ColumnarBatch create(StructType schema, ColumnVector[] columns, int capacity) { --- End diff -- @ueshin , if we want to allow creating a `ColumnarBatch` from any Array of `ColumnVector`s then we could make this public as it doesn't call `setNumRows` and assume they are allocated already --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130679948 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, --- End diff -- It doesn't need to be restricted, but if they are `ReadOnlyColumnVectors` then it means they are already populated and it is safe to call `setNumRows(numRows)` here. If it took in any `ColumnVector` then it might cause issues by someone passing in unallocated vectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130672955 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +for (ReadOnlyColumnVector c: columns) { + assert(c.capacity >= numRows); --- End diff -- Maybe this should throw an exception then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user highfei2011 commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130639092 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, --- End diff -- Is it necessary? What impact will it cause? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130559563 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, --- End diff -- Do we need to restrict this to only `ReadOnlyColumnVector`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r130553883 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,42 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); +return create(schema, columns, maxRows); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +for (ReadOnlyColumnVector c: columns) { + assert(c.capacity >= numRows); --- End diff -- Is there any good way to move this assert into other loop? I am afraid that the loop with no body is executed in a production. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/18787 [SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors ## What changes were proposed in this pull request? This PR allows the creation of a `ColumnarBatch` from `ReadOnlyColumnVectors` where previously a columnar batch could only allocate vectors internally. This is useful for using `ArrowColumnVectors` in a batch form to do row-based iteration. Also added `ArrowConverter.fromPayloadIterator` which converts `ArrowPayload` iterator to `InternalRow` iterator and uses a `ColumnarBatch` internally. ## How was this patch tested? Added a new unit test for creating a `ColumnarBatch` with `ReadOnlyColumnVectors` and a test to verify the roundtrip of rows -> ArrowPayload -> rows, using `toPayloadIterator` and `fromPayloadIterator`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark arrow-ColumnarBatch-support-SPARK-21583 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18787.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18787 commit 0c39389cbcf34255d446aac46d1ed01b795e5835 Author: Bryan CutlerDate: 2017-07-27T22:31:05Z refactored ColumnarBatch to allow creating from ColumnVectors commit a4be6cf0bb0363e394c520b121835e3190c36730 Author: Bryan Cutler Date: 2017-07-31T19:11:19Z Added fromPayloadIterator to use ColumnarBatch for row iteration commit f35b92c823db4b02edc6de0208fe17ce8b6c96c6 Author: Bryan Cutler Date: 2017-07-31T21:11:43Z added unit tests for ColumnarBatch with Arrow, and fromPayloadIterator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org