[ https://issues.apache.org/jira/browse/SPARK-27039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783690#comment-16783690 ]
Bryan Cutler commented on SPARK-27039: -------------------------------------- I was able to reproduce in v2.4.0, but it looks like current master raises an error in the driver and does not return an empty Pandas DataFrame. This is probably due to some of the recent changes in toPandas() with Arrow enabled. {noformat} In [4]: spark.conf.set('spark.sql.execution.arrow.enabled', True) In [5]: import pyspark.sql.functions as F ...: df = spark.range(1000 * 1000) ...: df_pd = df.withColumn("test", F.lit("this is a long string that should make the resulting dataframe too large for maxRe ...: sult which is 1m")).toPandas() ...: 19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 1 tasks (13.2 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB) 19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 2 tasks (26.4 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB) Exception in thread "serve-Arrow" org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (13.2 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1938) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1926) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1925) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1925) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:935) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:935) at scala.Option.foreach(Option.scala:274) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:935) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2155) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2104) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2093) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:746) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2008) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3300) at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3265) at org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$2(PythonRDD.scala:442) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$1(PythonRDD.scala:444) at org.apache.spark.api.python.PythonRDD$.$anonfun$serveToStream$1$adapted(PythonRDD.scala:439) at org.apache.spark.api.python.PythonServer$$anon$3.run(PythonRDD.scala:890) /home/bryan/git/spark/python/pyspark/sql/dataframe.py:2129: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation. warnings.warn(msg) 19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 3 tasks (39.6 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB) [Stage 0:==> (1 + 7) / 8][Stage 1:> (0 + 8) / 8]--------------------------------------------------------------------------- EOFError Traceback (most recent call last) <ipython-input-5-0e37c5091113> in <module>() 1 import pyspark.sql.functions as F 2 df = spark.range(1000 * 1000) ----> 3 df_pd = df.withColumn("test", F.lit("this is a long string that should make the resulting dataframe too large for maxResult which is 1m")).toPandas() /home/bryan/git/spark/python/pyspark/sql/dataframe.pyc in toPandas(self) 2111 _check_dataframe_localize_timestamps 2112 import pyarrow -> 2113 batches = self._collectAsArrow() 2114 if len(batches) > 0: 2115 table = pyarrow.Table.from_batches(batches) /home/bryan/git/spark/python/pyspark/sql/dataframe.pyc in _collectAsArrow(self) 2170 2171 # Collect list of un-ordered batches where last element is a list of correct order indices -> 2172 results = list(_load_from_socket(sock_info, ArrowCollectSerializer())) 2173 batches = results[:-1] 2174 batch_order = results[-1] /home/bryan/git/spark/python/pyspark/serializers.pyc in load_stream(self, stream) 208 209 # load the batch order indices --> 210 num = read_int(stream) 211 batch_order = [] 212 for i in xrange(num): /home/bryan/git/spark/python/pyspark/serializers.pyc in read_int(stream) 787 length = stream.read(4) 788 if not length: --> 789 raise EOFError 790 return struct.unpack("!i", length)[0] 791 EOFError: In [6]: 19/03/04 10:54:56 ERROR TaskSetManager: Total size of serialized results of 4 tasks (52.8 MiB) is bigger than spark.driver.maxResultSize (1024.0 KiB) [Stage 0:==> (1 + 7) / 8][Stage 1:> (0 +In [6]: In [6]: In [6]: df_pd.info() --------------------------------------------------------------------------- NameError Traceback (most recent call last) <ipython-input-6-0aeb36a3e0a1> in <module>() ----> 1 df_pd.info() NameError: name 'df_pd' is not defined {noformat} The error could be improved to something other than an EOFError though. We won't be able to backport these changes to 2.4, is this something you can work around until 3.0 is available [~peay] ? > toPandas with Arrow swallows maxResultSize errors > ------------------------------------------------- > > Key: SPARK-27039 > URL: https://issues.apache.org/jira/browse/SPARK-27039 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.0 > Reporter: peay > Priority: Minor > > I am running the following simple `toPandas` with {{maxResultSize}} set to > 1mb: > {code:java} > import pyspark.sql.functions as F > df = spark.range(1000 * 1000) > df_pd = df.withColumn("test", F.lit("this is a long string that should make > the resulting dataframe too large for maxResult which is 1m")).toPandas() > {code} > > With {{spark.sql.execution.arrow.enabled}} set to {{true}}, this returns an > empty Pandas dataframe without any error: > {code:python} > df_pd.info() > # <class 'pandas.core.frame.DataFrame'> > # Index: 0 entries > # Data columns (total 2 columns): > # id 0 non-null object > # test 0 non-null object > # dtypes: object(2) > # memory usage: 0.0+ bytes > {code} > The driver stderr does have an error, and so does the Spark UI: > {code:java} > ERROR TaskSetManager: Total size of serialized results of 1 tasks (52.8 MB) > is bigger than spark.driver.maxResultSize (1024.0 KB) > ERROR TaskSetManager: Total size of serialized results of 2 tasks (105.7 MB) > is bigger than spark.driver.maxResultSize (1024.0 KB) > Exception in thread "serve-Arrow" org.apache.spark.SparkException: Job > aborted due to stage failure: Total size of serialized results of 1 tasks > (52.8 MB) is bigger than spark.driver.maxResultSize (1024.0 KB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at > org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3313) > at > org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3282) > at > org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply$mcV$sp(PythonRDD.scala:435) > at > org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply(PythonRDD.scala:435) > at > org.apache.spark.api.python.PythonRDD$$anonfun$6$$anonfun$apply$1.apply(PythonRDD.scala:435) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at > org.apache.spark.api.python.PythonRDD$$anonfun$6.apply(PythonRDD.scala:436) > at > org.apache.spark.api.python.PythonRDD$$anonfun$6.apply(PythonRDD.scala:432) > at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:862) > {code} > With {{spark.sql.execution.arrow.enabled}} set to {{false}}, the Python call > to {{toPandas}} does fail as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org