[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-12-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22275


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-11-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r232420076
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,28 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+# Collects Arrow RecordBatches out of order in driver JVM then 
re-orders in Python
+def run_test(num_records, num_parts, max_records):
+df = self.spark.range(num_records, 
numPartitions=num_parts).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
+
+cases = [
+(1024, 512, 2),  # Try large num partitions for good chance of 
not collecting in order
+(512, 64, 2),# Try medium num partitions to test out of 
order collection
+(64, 8, 2),  # Try small number of partitions to test out 
of order collection
+(64, 64, 1), # Test single batch per partition
+(64, 1, 64), # Test single partition, single batch
+(64, 1, 8),  # Test single partition, multiple batches
+(30, 7, 2),  # Test different sized partitions
+]
--- End diff --

I like the new tests, I think 0.1 on one of partitions is enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-11-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r232420015
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,34 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+def delay_first_part(partition_index, iterator):
+if partition_index == 0:
+time.sleep(0.1)
--- End diff --

I like this :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-11-08 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r232145973
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,28 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+# Collects Arrow RecordBatches out of order in driver JVM then 
re-orders in Python
+def run_test(num_records, num_parts, max_records):
+df = self.spark.range(num_records, 
numPartitions=num_parts).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
+
+cases = [
+(1024, 512, 2),  # Try large num partitions for good chance of 
not collecting in order
+(512, 64, 2),# Try medium num partitions to test out of 
order collection
+(64, 8, 2),  # Try small number of partitions to test out 
of order collection
+(64, 64, 1), # Test single batch per partition
+(64, 1, 64), # Test single partition, single batch
+(64, 1, 8),  # Test single partition, multiple batches
+(30, 7, 2),  # Test different sized partitions
+]
--- End diff --

@holdenk , I updated the tests, please take another look when you get a 
chance. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r231311398
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,28 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+# Collects Arrow RecordBatches out of order in driver JVM then 
re-orders in Python
+def run_test(num_records, num_parts, max_records):
+df = self.spark.range(num_records, 
numPartitions=num_parts).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
+
+cases = [
+(1024, 512, 2),  # Try large num partitions for good chance of 
not collecting in order
+(512, 64, 2),# Try medium num partitions to test out of 
order collection
+(64, 8, 2),  # Try small number of partitions to test out 
of order collection
+(64, 64, 1), # Test single batch per partition
+(64, 1, 64), # Test single partition, single batch
+(64, 1, 8),  # Test single partition, multiple batches
+(30, 7, 2),  # Test different sized partitions
+]
--- End diff --

Yeah, it's not a guarantee, but with a large num of partitions, it's a 
pretty slim chance they will all be in order. I can also add a case with some 
delay. My only concern is how big to make a delay to be sure it's enough 
without adding wasted time to the tests. 

How about we keep the case with a large number of partitions and add a case 
with 100ms delay on the first partition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-11-02 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r230423471
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,28 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+# Collects Arrow RecordBatches out of order in driver JVM then 
re-orders in Python
+def run_test(num_records, num_parts, max_records):
+df = self.spark.range(num_records, 
numPartitions=num_parts).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
+
+cases = [
+(1024, 512, 2),  # Try large num partitions for good chance of 
not collecting in order
+(512, 64, 2),# Try medium num partitions to test out of 
order collection
+(64, 8, 2),  # Try small number of partitions to test out 
of order collection
+(64, 64, 1), # Test single batch per partition
+(64, 1, 64), # Test single partition, single batch
+(64, 1, 8),  # Test single partition, multiple batches
+(30, 7, 2),  # Test different sized partitions
+]
--- End diff --

I don't see how we're guaranteeing out-of-order from the JVM. Could we 
delay on one of the early partitions to guarantee out of order?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-10-30 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r229522939
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4923,6 +4923,28 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+
+# Collects Arrow RecordBatches out of order in driver JVM then 
re-orders in Python
+def run_test(num_records, num_parts, max_records):
+df = self.spark.range(num_records, 
numPartitions=num_parts).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
+
+cases = [
+(1024, 512, 2),  # Try large num partitions for good chance of 
not collecting in order
+(512, 64, 2),# Try medium num partitions to test out of 
order collection
+(64, 8, 2),  # Try small number of partitions to test out 
of order collection
+(64, 64, 1), # Test single batch per partition
+(64, 1, 64), # Test single partition, single batch
+(64, 1, 8),  # Test single partition, multiple batches
+(30, 7, 2),  # Test different sized partitions
+]
--- End diff --

@holdenk and @felixcheung , I didn't do a loop but chose some different 
levels of partition numbers to be a bit more sure that partitions won't end up 
in order. I also added some other cases of different partition/batch ratios. 
Let me know if you think we need more to be sure here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-10-06 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r223197940
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4434,6 +4434,12 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+df = self.spark.range(64, numPartitions=8).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
--- End diff --

that sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r223116201
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
+  }
+  partitionCount += 1
+
+  // After last batch, end the stream and write batch order
+  if (partitionCount == numPartitions) {
+batchWriter.end()
+out.writeInt(batchOrder.length)
+// Batch order indices are from 0 to N-1 batches, sorted by 
order they arrived
--- End diff --

yeah, sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r223116082
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
--- End diff --

yup!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219556033
  
--- Diff: python/pyspark/serializers.py ---
@@ -208,8 +214,26 @@ def load_stream(self, stream):
 for batch in reader:
 yield batch
 
+if self.load_batch_order:
+num = read_int(stream)
+self.batch_order = []
+for i in xrange(num):
+index = read_int(stream)
+self.batch_order.append(index)
+
+def get_batch_order_and_reset(self):
--- End diff --

Looking at `_load_from_socket` I think I understand why this was done as a 
separate function here, but what about if the serializer its self returned 
either a tuple or re-ordered the batches its self?

I'm just trying to get a better understanding, not saying those are better 
designs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219558311
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
+  }
+  partitionCount += 1
+
+  // After last batch, end the stream and write batch order
+  if (partitionCount == numPartitions) {
+batchWriter.end()
+out.writeInt(batchOrder.length)
+// Batch order indices are from 0 to N-1 batches, sorted by 
order they arrived
--- End diff --

How about something like `// Sort by the output global batch indexes 
partition index, partition batch index tuple`?
When I was first read this code path I got confused my self so I think we 
should spend a bit of time on the comment here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219556534
  
--- Diff: python/pyspark/serializers.py ---
@@ -208,8 +214,26 @@ def load_stream(self, stream):
 for batch in reader:
 yield batch
 
+if self.load_batch_order:
+num = read_int(stream)
+self.batch_order = []
--- End diff --

If we're going to have get_batch_order_and_reset as a separate function, 
could we verify batch_order is None before we reset and throw here if it's not? 
Just thinking of future folks who might have to debug something here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219561178
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
--- End diff --

Could we call `i` something more descriptive like partition_batch_num or 
similar?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219557215
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4434,6 +4434,12 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+df = self.spark.range(64, numPartitions=8).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
--- End diff --

This looks pretty similar to the kind of test case we could verify with 
something like hypothesis. Integrating hypothesis is probably too much work, 
but we could at least explore num partitions space in a loop quickly here. 
Would that help do you think @felixcheung ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-09-21 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r219404072
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4434,6 +4434,12 @@ def test_timestamp_dst(self):
 self.assertPandasEqual(pdf, df_from_python.toPandas())
 self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+def test_toPandas_batch_order(self):
+df = self.spark.range(64, numPartitions=8).toDF("a")
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
+pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
+self.assertPandasEqual(pdf, pdf_arrow)
--- End diff --

hm, is this test case "enough" to trigger any possible problem just by 
random? would increasing the number of batch or num record per batch increase 
the chance of streaming order or concurrency issue perhaps?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-08-30 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r214131313
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
+  }
+  partitionCount += 1
+
+  // After last batch, end the stream and write batch order
+  if (partitionCount == numPartitions) {
+batchWriter.end()
+out.writeInt(batchOrder.length)
+// Batch order indices are from 0 to N-1 batches, sorted by 
order they arrived
--- End diff --

How about a slight change? `// Re-order batches according to these indices 
to build a table.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-08-30 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r214129436
  
--- Diff: python/pyspark/serializers.py ---
@@ -187,9 +187,15 @@ def loads(self, obj):
 
 class ArrowStreamSerializer(Serializer):
 """
-Serializes Arrow record batches as a stream.
+Serializes Arrow record batches as a stream. Optionally load the 
ordering of the batches as a
--- End diff --

Yeah, it's also used in the `createDataFrame` path, but that does only use 
`dump_stream`. Still, it seemed best to make this an optional feature of the 
serializer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-08-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r213862165
  
--- Diff: python/pyspark/serializers.py ---
@@ -187,9 +187,15 @@ def loads(self, obj):
 
 class ArrowStreamSerializer(Serializer):
 """
-Serializes Arrow record batches as a stream.
+Serializes Arrow record batches as a stream. Optionally load the 
ordering of the batches as a
--- End diff --

This is optional. Do we have other usage of this `ArrowStreamSerializer` 
without the ordering?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-08-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22275#discussion_r213860328
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
 val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  PythonRDD.serveToStream("serve-Arrow") { out =>
+  PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+val out = new DataOutputStream(outputStream)
 val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
 val arrowBatchRdd = toArrowBatchRdd(plan)
 val numPartitions = arrowBatchRdd.partitions.length
 
-// Store collection results for worst case of 1 to N-1 partitions
-val results = new Array[Array[Array[Byte]]](numPartitions - 1)
-var lastIndex = -1  // index of last partition written
+// Batches ordered by (index of partition, batch # in partition) 
tuple
+val batchOrder = new ArrayBuffer[(Int, Int)]()
+var partitionCount = 0
 
-// Handler to eagerly write partitions to Python in order
+// Handler to eagerly write batches to Python out of order
 def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
-  // If result is from next partition in order
-  if (index - 1 == lastIndex) {
+  if (arrowBatches.nonEmpty) {
 batchWriter.writeBatches(arrowBatches.iterator)
-lastIndex += 1
-// Write stored partitions that come next in order
-while (lastIndex < results.length && results(lastIndex) != 
null) {
-  batchWriter.writeBatches(results(lastIndex).iterator)
-  results(lastIndex) = null
-  lastIndex += 1
-}
-// After last batch, end the stream
-if (lastIndex == results.length) {
-  batchWriter.end()
+arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
+  }
+  partitionCount += 1
+
+  // After last batch, end the stream and write batch order
+  if (partitionCount == numPartitions) {
+batchWriter.end()
+out.writeInt(batchOrder.length)
+// Batch order indices are from 0 to N-1 batches, sorted by 
order they arrived
--- End diff --

nit: `Batch order indices are from 0 to N-1 batches, sorted by order they 
arrived. Re-sort indices to the correct order to build a table.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...

2018-08-29 Thread BryanCutler
GitHub user BryanCutler opened a pull request:

https://github.com/apache/spark/pull/22275

[SPARK-25274][PYTHON][SQL] In toPandas with Arrow send out-of-order record 
batches to improve performance

## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the 
JVM out-of-order must be buffered before they can be send to Python. This 
causes an excess of memory to be used in the driver JVM and increases the time 
it takes to complete because data must sit in the JVM waiting for preceding 
partitions to come in.

This change sends out-of-order partitions to Python as soon as they arrive 
in the JVM, followed by a list of partition indices so that Python can assemble 
the data in the correct order. This way, data is not buffered at the JVM and 
there is no waiting on particular partitions so performance will be increased.

Followup to #21546 

## How was this patch tested?

Added new test with a large number of batches per partition to ensure they 
are put in the correct order


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BryanCutler/spark 
arrow-toPandas-oo-batches-SPARK-25274

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22275


commit 2041317ba1efa0595431a900472b2e347c50d23c
Author: Bryan Cutler 
Date:   2018-08-29T21:26:49Z

changed toPandas to send out of order batches, followed by batch order 
indices

commit d6fefee68c30aa579b345c32d9f00b32bf9a505b
Author: Bryan Cutler 
Date:   2018-08-29T21:38:25Z

Consolidated BatchOrderSerializer into ArrowStreamSerializer and made 
optional




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org