[ https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241342#comment-17241342 ]
Darshat commented on SPARK-33576: --------------------------------- I turned on the jvm exception logging, and this is the full trace: JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 58.0 failed 4 times, most recent failure: Lost task 1.3 in stage 58.0 (TID 3336, 10.139.64.5, executor 7): org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message: negative bodyLength'. Full traceback below: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 654, in main process() File "/databricks/spark/python/pyspark/worker.py", line 646, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in dump_stream for batch in iterator: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in init_stream_yield_batches for series in iterator: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in load_stream for batch in batches: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in load_stream for batch in batches: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ File "pyarrow/ipc.pxi", line 432, in pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative bodyLength at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at org.apache.spark.scheduler.Task.run(Task.scala:117) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Caused by: org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message: negative bodyLength'. Full traceback below: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 654, in main process() File "/databricks/spark/python/pyspark/worker.py", line 646, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in dump_stream for batch in iterator: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in init_stream_yield_batches for series in iterator: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in load_stream for batch in batches: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in load_stream for batch in batches: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ File "pyarrow/ipc.pxi", line 432, in pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative bodyLength at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at org.apache.spark.scheduler.Task.run(Task.scala:117) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) > PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC > message: negative bodyLength'. > --------------------------------------------------------------------------------------------------------- > > Key: SPARK-33576 > URL: https://issues.apache.org/jira/browse/SPARK-33576 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.1 > Environment: Databricks runtime 7.3 > Spakr 3.0.1 > Scala 2.12 > Reporter: Darshat > Priority: Major > > Hello, > We are using Databricks on Azure to process large amount of ecommerce data. > Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12. > During processing, there is a groupby operation on the DataFrame that > consistently gets an exception of this type: > > {color:#ff0000}PythonException: An exception was thrown from a UDF: 'OSError: > Invalid IPC message: negative bodyLength'. Full traceback below: Traceback > (most recent call last): File "/databricks/spark/python/pyspark/worker.py", > line 654, in main process() File > "/databricks/spark/python/pyspark/worker.py", line 646, in process > serializer.dump_stream(out_iter, outfile) File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in > dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in > dump_stream for batch in iterator: File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in > init_stream_yield_batches for series in iterator: File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in > load_stream for batch in batches: File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in > load_stream for batch in batches: File > "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in > load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in > __iter__ File "pyarrow/ipc.pxi", line 432, in > pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", > line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative > bodyLength{color} > > Code that causes this: > {color:#ff0000}x = df.groupby('providerid').apply(domain_features){color} > {color:#ff0000}display(x.info()){color} > Dataframe size - 22 million rows, 31 columns > One of the columns is a string ('providerid') on which we do a groupby > followed by an apply operation. There are 3 distinct provider ids in this > set. While trying to enumerate/count the results, we get this exception. > We've put all possible checks in the code for null values, or corrupt data > and we are not able to track this to application level code. I hope we can > get some help troubleshooting this as this is a blocker for rolling out at > scale. > The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other > settings that could be useful. > Hope to get some insights into the problem. > Thanks, > Darshat Shah -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org