[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136413085
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

I just made #19098 to remove this check - it's not really testing the 
functionality added here anyway but maybe another test should be added for 
checkout index out of bounds errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136409601
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

I think the problem is that if the Java assertion is compiled out, then no 
error is produce and the test fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136408878
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

Maybe?
```scala
val m = intercept[java.lang.AssertionError] {
...
}.getMessage
assert(m.contains(...))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136408531
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

Then, please check the error message here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136408063
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

It's probably because the assert is being compiled out.. This should 
probably not be in the test then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136407451
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

Thanks! It seems to happen Maven only. sbt-hadoop-2.6 passed.
- 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/3480/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136406559
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

Hmm, that is strange.  I'll take a look, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-31 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r136404583
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create columnar batch from Arrow column vectors") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
--- End diff --

Hi, @BryanCutler and @ueshin .
This seems to make master branch fail. Could you take a look once more? 
Thank you in advance!
- 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/3696/testReport/
- 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/3730/testReport/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18787


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135952994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -111,6 +125,66 @@ private[sql] object ArrowConverters {
   }
 
   /**
+   * Maps Iterator from ArrowPayload to InternalRow. Returns a pair 
containing the row iterator
+   * and the schema from the first batch of Arrow data read.
+   */
+  private[sql] def fromPayloadIterator(
+  payloadIter: Iterator[ArrowPayload],
+  context: TaskContext): ArrowRowIterator = {
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, 
Long.MaxValue)
+
+new ArrowRowIterator {
+  private var reader: ArrowFileReader = null
+  private var schemaRead = StructType(Seq.empty)
+  private var rowIter = if (payloadIter.hasNext) nextBatch() else 
Iterator.empty
--- End diff --

nvm, I thought the first call of `hasNext` would initialize it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135952996
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector.allocateNew()
+val mutator = vector.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator.setSafe(i, i)
+}
+mutator.setNull(10)
+mutator.setValueCount(11)
+
+val schema = StructType(Seq(StructField("int", IntegerType)))
+
+val batch = new ColumnarBatch(schema, Array[ColumnVector](new 
ArrowColumnVector(vector)), 11)
--- End diff --

Yes, I meant your second comment.
We do test the columnar batch with `ArrowColumnVector` in 
`ColumnarBatchSuite` and also we use it in 
`ArrowConverters.fromPayloadIterator()`, so I thought we don't need to use it 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135864769
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector.allocateNew()
+val mutator = vector.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator.setSafe(i, i)
+}
+mutator.setNull(10)
+mutator.setValueCount(11)
+
+val schema = StructType(Seq(StructField("int", IntegerType)))
+
+val batch = new ColumnarBatch(schema, Array[ColumnVector](new 
ArrowColumnVector(vector)), 11)
--- End diff --

Oh, you mean not using `ArrowColumnVector` at all and just make an 
`Iterator[InternalRow]` some other way?  That would probably work, but I 
figured why not test out the columnar batch this way also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135863376
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector.allocateNew()
+val mutator = vector.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator.setSafe(i, i)
+}
+mutator.setNull(10)
+mutator.setValueCount(11)
+
+val schema = StructType(Seq(StructField("int", IntegerType)))
+
+val batch = new ColumnarBatch(schema, Array[ColumnVector](new 
ArrowColumnVector(vector)), 11)
--- End diff --

you mean just calling something like `new ColumnarBatch(..).rowIterator()`? 
 We still need to set the number of rows in the batch I believe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135862583
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
--- End diff --

yes, thanks for catching that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135861235
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -111,6 +125,66 @@ private[sql] object ArrowConverters {
   }
 
   /**
+   * Maps Iterator from ArrowPayload to InternalRow. Returns a pair 
containing the row iterator
+   * and the schema from the first batch of Arrow data read.
+   */
+  private[sql] def fromPayloadIterator(
+  payloadIter: Iterator[ArrowPayload],
+  context: TaskContext): ArrowRowIterator = {
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, 
Long.MaxValue)
+
+new ArrowRowIterator {
+  private var reader: ArrowFileReader = null
+  private var schemaRead = StructType(Seq.empty)
+  private var rowIter = if (payloadIter.hasNext) nextBatch() else 
Iterator.empty
--- End diff --

`nextBatch()` returns the row iterator, so `rowIter` needs to be 
initialized here to the first row in the first batch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135439310
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create read-only batch") {
--- End diff --

`create a columnar batch from Arrow column vectors` or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135439372
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 ---
@@ -1261,4 +1264,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
 s"vectorized reader"))
 }
   }
+
+  test("create read-only batch") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector1.allocateNew()
+val mutator1 = vector1.getMutator()
+val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector2.allocateNew()
+val mutator2 = vector2.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator1.setSafe(i, i)
+  mutator2.setSafe(i + 1, i)
+}
+mutator1.setNull(10)
+mutator1.setValueCount(11)
+mutator2.setNull(0)
+mutator2.setValueCount(11)
+
+val columnVectors = Seq(new ArrowColumnVector(vector1), new 
ArrowColumnVector(vector2))
+
+val schema = StructType(Seq(StructField("int1", IntegerType), 
StructField("int2", IntegerType)))
+val batch = new ColumnarBatch(schema, 
columnVectors.toArray[ColumnVector], 11)
+batch.setNumRows(11)
+
+assert(batch.numCols() == 2)
+assert(batch.numRows() == 11)
+
+val rowIter = batch.rowIterator().asScala
+rowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i == 10) {
+assert(row.isNullAt(0))
+  } else {
+assert(row.getInt(0) == i)
+  }
+  if (i == 0) {
+assert(row.isNullAt(1))
+  } else {
+assert(row.getInt(1) == i - 1)
+  }
+}
+
+intercept[java.lang.AssertionError] {
+  batch.getRow(100)
+}
+
+columnVectors.foreach(_.close())
--- End diff --

We can use `batch.close()` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135439793
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
+vector.allocateNew()
+val mutator = vector.getMutator()
+
+(0 until 10).foreach { i =>
+  mutator.setSafe(i, i)
+}
+mutator.setNull(10)
+mutator.setValueCount(11)
+
+val schema = StructType(Seq(StructField("int", IntegerType)))
+
+val batch = new ColumnarBatch(schema, Array[ColumnVector](new 
ArrowColumnVector(vector)), 11)
--- End diff --

Btw, do we need to use `ColumnarBatch` for this test?
I guess we can simply create `Iterator[InternalRow]` and use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135438683
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -111,6 +125,66 @@ private[sql] object ArrowConverters {
   }
 
   /**
+   * Maps Iterator from ArrowPayload to InternalRow. Returns a pair 
containing the row iterator
+   * and the schema from the first batch of Arrow data read.
+   */
+  private[sql] def fromPayloadIterator(
+  payloadIter: Iterator[ArrowPayload],
+  context: TaskContext): ArrowRowIterator = {
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, 
Long.MaxValue)
+
+new ArrowRowIterator {
+  private var reader: ArrowFileReader = null
+  private var schemaRead = StructType(Seq.empty)
+  private var rowIter = if (payloadIter.hasNext) nextBatch() else 
Iterator.empty
--- End diff --

We can simply put `Iterator.empty` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r135439857
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1629,6 +1632,39 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
+  test("roundtrip payloads") {
+val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, 
Long.MaxValue)
+val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = 
true)
+  .createVector(allocator).asInstanceOf[NullableIntVector]
--- End diff --

Should the `allocator` and the `vector` be closed at the end of this test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132252558
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
--- End diff --

The max `capacity` only has meaning when allocating `ColumnVectors` so it 
doesn't really do 
 anything for read-only vectors.  You need to call`setNumRows` to tell the 
batch how many rows there for the given columns, it doesn't look at the 
capacity in the individual vectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132246475
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
+batch.setNumRows(numRows);
--- End diff --

The `ArrowColumnVector.valueCount` 
[here](https://github.com/BryanCutler/spark/blob/23d19dfde53d02c37a2c20f67c9816a73bd57cd2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java#L35)
 would need to be moved to `ReadOnlyColumnVector` which could go in place of 
the capacity.  If @ueshin thinks that's ok to do so here, I can add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132188490
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
--- End diff --

Why the capacity is set to `numRows` inside the ctor but need to call 
`batch.setNumRows()` manually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132187027
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
+batch.setNumRows(numRows);
--- End diff --

Do we need to check each ReadOnlyColumnVector has numRows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-04 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r131493567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -110,6 +113,67 @@ private[sql] object ArrowConverters {
 }
   }
 
+  private[sql] def fromPayloadIterator(
+  payloadIter: Iterator[ArrowPayload],
+  schema: StructType,
+  context: TaskContext): Iterator[InternalRow] = {
+
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, 
Long.MaxValue)
+var reader: ArrowFileReader = null
+
+new Iterator[InternalRow] {
+
+  context.addTaskCompletionListener { _ =>
+close()
+  }
+
+  private var _batch: ColumnarBatch = _
--- End diff --

TODO: not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130680847
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+for (ReadOnlyColumnVector c: columns) {
+  assert(c.capacity >= numRows);
+}
+ColumnarBatch batch = create(schema, columns, numRows);
+batch.setNumRows(numRows);
+return batch;
+  }
+
+  private static ColumnarBatch create(StructType schema, ColumnVector[] 
columns, int capacity) {
--- End diff --

@ueshin , if we want to allow creating a `ColumnarBatch` from any Array of 
`ColumnVector`s then we could make this public as it doesn't call `setNumRows` 
and assume they are allocated already


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130679948
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
--- End diff --

It doesn't need to be restricted, but if they are `ReadOnlyColumnVectors` 
then it means they are already populated and it is safe to call 
`setNumRows(numRows)` here.  If it took in any `ColumnVector` then it might 
cause issues by someone passing in unallocated vectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130672955
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+for (ReadOnlyColumnVector c: columns) {
+  assert(c.capacity >= numRows);
--- End diff --

Maybe this should throw an exception then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread highfei2011
Github user highfei2011 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130639092
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
--- End diff --

Is it necessary? What impact will it cause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130559563
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
--- End diff --

Do we need to restrict this to only `ReadOnlyColumnVector`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r130553883
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,42 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateVectors(schema, maxRows, memMode);
+return create(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateVectors(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+for (ReadOnlyColumnVector c: columns) {
+  assert(c.capacity >= numRows);
--- End diff --

Is there any good way to move this assert into other loop?
I am afraid that the loop with no body is executed in a production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-07-31 Thread BryanCutler
GitHub user BryanCutler opened a pull request:

https://github.com/apache/spark/pull/18787

[SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors

## What changes were proposed in this pull request?

This PR allows the creation of a `ColumnarBatch` from 
`ReadOnlyColumnVectors` where previously a columnar batch could only allocate 
vectors internally.  This is useful for using `ArrowColumnVectors` in a batch 
form to do row-based iteration.  Also added 
`ArrowConverter.fromPayloadIterator` which converts `ArrowPayload` iterator to 
`InternalRow` iterator and uses a `ColumnarBatch` internally.

## How was this patch tested?

Added a new unit test for creating a `ColumnarBatch` with 
`ReadOnlyColumnVectors` and a test to verify the roundtrip of rows -> 
ArrowPayload -> rows, using `toPayloadIterator` and `fromPayloadIterator`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BryanCutler/spark 
arrow-ColumnarBatch-support-SPARK-21583

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18787.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18787


commit 0c39389cbcf34255d446aac46d1ed01b795e5835
Author: Bryan Cutler 
Date:   2017-07-27T22:31:05Z

refactored ColumnarBatch to allow creating from ColumnVectors

commit a4be6cf0bb0363e394c520b121835e3190c36730
Author: Bryan Cutler 
Date:   2017-07-31T19:11:19Z

Added fromPayloadIterator to use ColumnarBatch for row iteration

commit f35b92c823db4b02edc6de0208fe17ce8b6c96c6
Author: Bryan Cutler 
Date:   2017-07-31T21:11:43Z

added unit tests for ColumnarBatch with Arrow, and fromPayloadIterator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org