[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241342#comment-17241342
 ] 

Darshat commented on SPARK-33576:
---------------------------------

I turned on the jvm exception logging, and this is the full trace:

JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 1 in stage 58.0 failed 4 times, most recent failure: Lost task 
1.3 in stage 58.0 (TID 3336, 10.139.64.5, executor 7): 
org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message: 
negative bodyLength'. Full traceback below: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 654, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 646, in process 
serializer.dump_stream(out_iter, outfile) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
dump_stream for batch in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
init_stream_yield_batches for series in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ 
File "pyarrow/ipc.pxi", line 432, in 
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
bodyLength at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at 
org.apache.spark.scheduler.Task.run(Task.scala:117) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
 at scala.Option.foreach(Option.scala:407) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: 
org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message: 
negative bodyLength'. Full traceback below: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 654, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 646, in process 
serializer.dump_stream(out_iter, outfile) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
dump_stream for batch in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
init_stream_yield_batches for series in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ 
File "pyarrow/ipc.pxi", line 432, in 
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
bodyLength at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at 
org.apache.spark.scheduler.Task.run(Task.scala:117) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33576
>                 URL: https://issues.apache.org/jira/browse/SPARK-33576
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.1
>         Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>            Reporter: Darshat
>            Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff0000}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff0000}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff0000}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to