[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784843#comment-16784843 ]
Bryan Cutler commented on SPARK-23961: -------------------------------------- I could also reproduce with a nearly identical error using the following {code} import time from pyspark.sql import SparkSession from pyspark.sql.functions import rand, udf from pyspark.sql.types import * spark = SparkSession\ .builder\ .appName("toLocalIterator_Test")\ .getOrCreate() df = spark.range(1 << 16).select(rand()) it = df.toLocalIterator() print(next(it)) it = None time.sleep(5) spark.stop() {code} I think there are a couple issues with the way this is currently working. When toLocalIterator is called in Python, the Scala side also creates a local iterator which immediately starts a loop to consume the entire iterator and write it all to Python without any synchronization with the Python iterator. Blocking the write operation only happens when the socket receive buffer is full. Small examples work fine if the data all fits in the read buffer, but the above code fails because the writing becomes blocked, then the Python iterator stops reading and closes the connection, which the Scala side sees as an error. I can work on a fix for this. > pyspark toLocalIterator throws an exception > ------------------------------------------- > > Key: SPARK-23961 > URL: https://issues.apache.org/jira/browse/SPARK-23961 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0 > Reporter: Michel Lemay > Priority: Minor > Labels: DataFrame, pyspark > > Given a dataframe and use toLocalIterator. If we do not consume all records, > it will throw: > {quote}ERROR PythonRDD: Error while sending iterator > java.net.SocketException: Connection reset by peer: socket write error > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) > at java.net.SocketOutputStream.write(SocketOutputStream.java:155) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) > {quote} > > To reproduce, here is a simple pyspark shell script that show the error: > {quote}import itertools > df = spark.read.parquet("large parquet folder").cache() > print(df.count()) > b = df.toLocalIterator() > print(len(list(itertools.islice(b, 20)))) > b = None # Make the iterator goes out of scope. Throws here. > {quote} > > Observations: > * Consuming all records do not throw. Taking only a subset of the > partitions create the error. > * In another experiment, doing the same on a regular RDD works if we > cache/materialize it. If we do not cache the RDD, it throws similarly. > * It works in scala shell > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org