[ 
https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784843#comment-16784843
 ] 

Bryan Cutler commented on SPARK-23961:
--------------------------------------

I could also reproduce with a nearly identical error using the following

{code}
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, udf
from pyspark.sql.types import *

spark = SparkSession\
        .builder\
        .appName("toLocalIterator_Test")\
        .getOrCreate()

df = spark.range(1 << 16).select(rand())

it = df.toLocalIterator()

print(next(it))
it = None

time.sleep(5)
spark.stop()
{code}

I think there are a couple issues with the way this is currently working. When 
toLocalIterator is called in Python, the Scala side also creates a local 
iterator which immediately starts a loop to consume the entire iterator and 
write it all to Python without any synchronization with the Python iterator. 
Blocking the write operation only happens when the socket receive buffer is 
full.  Small examples work fine if the data all fits in the read buffer, but 
the above code fails because the writing becomes blocked, then the Python 
iterator stops reading and closes the connection, which the Scala side sees as 
an error.  I can work on a fix for this.

> pyspark toLocalIterator throws an exception
> -------------------------------------------
>
>                 Key: SPARK-23961
>                 URL: https://issues.apache.org/jira/browse/SPARK-23961
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0
>            Reporter: Michel Lemay
>            Priority: Minor
>              Labels: DataFrame, pyspark
>
> Given a dataframe and use toLocalIterator. If we do not consume all records, 
> it will throw: 
> {quote}ERROR PythonRDD: Error while sending iterator
>  java.net.SocketException: Connection reset by peer: socket write error
>  at java.net.SocketOutputStream.socketWrite0(Native Method)
>  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
>  at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
>  at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>  at java.io.DataOutputStream.write(DataOutputStream.java:107)
>  at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
>  at 
> org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
>  at 
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
>  at 
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
>  at 
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>  at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
> {quote}
>  
> To reproduce, here is a simple pyspark shell script that show the error:
> {quote}import itertools
>  df = spark.read.parquet("large parquet folder").cache()
> print(df.count())
>  b = df.toLocalIterator()
>  print(len(list(itertools.islice(b, 20))))
>  b = None # Make the iterator goes out of scope.  Throws here.
> {quote}
>  
> Observations:
>  * Consuming all records do not throw.  Taking only a subset of the 
> partitions create the error.
>  * In another experiment, doing the same on a regular RDD works if we 
> cache/materialize it. If we do not cache the RDD, it throws similarly.
>  * It works in scala shell
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to