[ 
https://issues.apache.org/jira/browse/SPARK-27039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783690#comment-16783690
 ] 

Bryan Cutler commented on SPARK-27039:
--------------------------------------

I was able to reproduce in v2.4.0, but it looks like current master raises an 
error in the driver and does not return an empty Pandas DataFrame. This is 
probably due to some of the recent changes in toPandas() with Arrow enabled.

{noformat}
In [4]: spark.conf.set('spark.sql.execution.arrow.enabled', True)

In [5]: import pyspark.sql.functions as F
   ...: df = spark.range(1000 * 1000)
   ...: df_pd = df.withColumn("test", F.lit("this is a long string that should 
make the resulting dataframe too large for maxRe
   ...: sult which is 1m")).toPandas()
   ...: 
19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 1 
tasks (13.2 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 2 
tasks (26.4 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
Exception in thread "serve-Arrow" org.apache.spark.SparkException: Job aborted 
due to stage failure: Total size of serialized results of 1 tasks (13.2 MiB) is 
bigger than spark.driver.maxResultSize (1024.0 KiB)
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1938)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1926)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1925)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1925)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:935)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:935)
        at scala.Option.foreach(Option.scala:274)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:935)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2155)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2104)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2093)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:746)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2008)
        at 
org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3300)
        at 
org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3265)
        at 
org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$2(PythonRDD.scala:442)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
        at 
org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$1(PythonRDD.scala:444)
        at 
org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$1$adapted(PythonRDD.scala:439)
        at 
org.apache.spark.api.python.PythonServer$$anon$3.run(PythonRDD.scala:890)
/home/bryan/git/spark/python/pyspark/sql/dataframe.py:2129: UserWarning: 
toPandas attempted Arrow optimization because 
'spark.sql.execution.arrow.enabled' is set to true, but has reached the error 
below and can not continue. Note that 
'spark.sql.execution.arrow.fallback.enabled' does not have an effect on 
failures in the middle of computation.
  
  warnings.warn(msg)
19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 3 
tasks (39.6 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB)
[Stage 0:==>                (1 + 7) / 8][Stage 1:>                  (0 + 8) / 
8]---------------------------------------------------------------------------
EOFError                                  Traceback (most recent call last)
<ipython-input-5-0e37c5091113> in <module>()
      1 import pyspark.sql.functions as F
      2 df = spark.range(1000 * 1000)
----> 3 df_pd = df.withColumn("test", F.lit("this is a long string that should 
make the resulting dataframe too large for maxResult which is 1m")).toPandas()

/home/bryan/git/spark/python/pyspark/sql/dataframe.pyc in toPandas(self)
   2111                         _check_dataframe_localize_timestamps
   2112                     import pyarrow
-> 2113                     batches = self._collectAsArrow()
   2114                     if len(batches) > 0:
   2115                         table = pyarrow.Table.from_batches(batches)

/home/bryan/git/spark/python/pyspark/sql/dataframe.pyc in _collectAsArrow(self)
   2170 
   2171         # Collect list of un-ordered batches where last element is a 
list of correct order indices
-> 2172         results = list(_load_from_socket(sock_info, 
ArrowCollectSerializer()))
   2173         batches = results[:-1]
   2174         batch_order = results[-1]

/home/bryan/git/spark/python/pyspark/serializers.pyc in load_stream(self, 
stream)
    208 
    209         # load the batch order indices
--> 210         num = read_int(stream)
    211         batch_order = []
    212         for i in xrange(num):

/home/bryan/git/spark/python/pyspark/serializers.pyc in read_int(stream)
    787     length = stream.read(4)
    788     if not length:
--> 789         raise EOFError
    790     return struct.unpack("!i", length)[0]
    791 

EOFError: 

In [6]: 19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized 
results of 4 tasks (52.8 MiB) is bigger than spark.driver.maxResultSize (1024.0 
KiB)
[Stage 0:==>                (1 + 7) / 8][Stage 1:>                  (0 +In [6]: 

In [6]: 

In [6]: df_pd.info()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-6-0aeb36a3e0a1> in <module>()
----> 1 df_pd.info()

NameError: name 'df_pd' is not defined
{noformat}

The error could be improved to something other than an EOFError though. We 
won't be able to backport these changes to 2.4, is this something you can work 
around until 3.0 is available [~peay] ?

> toPandas with Arrow swallows maxResultSize errors
> -------------------------------------------------
>
>                 Key: SPARK-27039
>                 URL: https://issues.apache.org/jira/browse/SPARK-27039
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: peay
>            Priority: Minor
>
> I am running the following simple `toPandas` with {{maxResultSize}} set to 
> 1mb:
> {code:java}
> import pyspark.sql.functions as F
> df = spark.range(1000 * 1000)
> df_pd = df.withColumn("test", F.lit("this is a long string that should make 
> the resulting dataframe too large for maxResult which is 1m")).toPandas()
> {code}
>  
> With {{spark.sql.execution.arrow.enabled}} set to {{true}}, this returns an 
> empty Pandas dataframe without any error:
> {code:python}
> df_pd.info()
> # <class 'pandas.core.frame.DataFrame'>
> # Index: 0 entries
> # Data columns (total 2 columns):
> # id      0 non-null object
> # test    0 non-null object
> # dtypes: object(2)
> # memory usage: 0.0+ bytes
> {code}
> The driver stderr does have an error, and so does the Spark UI:
> {code:java}
> ERROR TaskSetManager: Total size of serialized results of 1 tasks (52.8 MB) 
> is bigger than spark.driver.maxResultSize (1024.0 KB)
> ERROR TaskSetManager: Total size of serialized results of 2 tasks (105.7 MB) 
> is bigger than spark.driver.maxResultSize (1024.0 KB)
> Exception in thread "serve-Arrow" org.apache.spark.SparkException: Job 
> aborted due to stage failure: Total size of serialized results of 1 tasks 
> (52.8 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
>  at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>  at 
> org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3313)
>  at 
> org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3282)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply$mcV$sp(PythonRDD.scala:435)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply(PythonRDD.scala:435)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply(PythonRDD.scala:435)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$6.apply(PythonRDD.scala:436)
>  at 
> org.apache.spark.api.python.PythonRDD$$anonfun$6.apply(PythonRDD.scala:432)
>  at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:862)
> {code}
> With {{spark.sql.execution.arrow.enabled}} set to {{false}}, the Python call 
> to {{toPandas}} does fail as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to