[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darshat updated SPARK-33576:
----------------------------
    Description: 
Hello,

We are using Databricks on Azure to process large amount of ecommerce data. 
Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.

During processing, there is a groupby operation on the DataFrame that 
consistently gets an exception of this type:

 

{color:#ff0000}PythonException: An exception was thrown from a UDF: 'OSError: 
Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
(most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
line 654, in main process() File "/databricks/spark/python/pyspark/worker.py", 
line 646, in process serializer.dump_stream(out_iter, outfile) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
dump_stream for batch in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
init_stream_yield_batches for series in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ 
File "pyarrow/ipc.pxi", line 432, in 
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
bodyLength{color}

 

Code that causes this:

{color:#ff0000}x = df.groupby('providerid').apply(domain_features){color}

{color:#ff0000}display(x.info()){color}

Dataframe size - 22 million rows, 31 columns
 One of the columns is a string ('providerid') on which we do a groupby 
followed by an apply  operation. There are 3 distinct provider ids in this set. 
While trying to enumerate/count the results, we get this exception.

We've put all possible checks in the code for null values, or corrupt data and 
we are not able to track this to application level code. I hope we can get some 
help troubleshooting this as this is a blocker for rolling out at scale.

The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
settings that could be useful. 
 Hope to get some insights into the problem. 

Thanks,

Darshat Shah

  was:
Hello,

We are using Databricks on Azure to process large amount of ecommerce data. 
Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.

During processing, there is a groupby operation on the DataFrame that 
consistently gets an exception of this type:

 

{color:#FF0000}PythonException: An exception was thrown from a UDF: 'OSError: 
Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
(most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
line 654, in main process() File "/databricks/spark/python/pyspark/worker.py", 
line 646, in process serializer.dump_stream(out_iter, outfile) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
dump_stream for batch in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
init_stream_yield_batches for series in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ 
File "pyarrow/ipc.pxi", line 432, in 
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
bodyLength{color}

 

Code that causes this:

{color:#57d9a3}## df has 22 million rows and 3 distinct provider ids. Domain 
features adds couple of computed columns to the dataframe{color}
{color:#FF0000}x = df.groupby('providerid').apply(domain_features){color}

{color:#FF0000}display(x.info()){color}

We've put all possible checks in the code for null values, or corrupt data and 
we are not able to track this to application level code. I hope we can get some 
help troubleshooting this as this is a blocker for rolling out at scale.

Dataframe size - 22 million rows, 31 columns
One of the columns is a string ('providerid') on which we do a groupby followed 
by an apply  operation. There are 3 distinct provider ids in this set. While 
trying to enumerate/count the results, we get this exception.



The cluster has 8 nodes + driver, all 28GB. I can provide any other settings 
that could be useful. 
Hope to get some insights into the problem. 

Thanks,

Darshat Shah


> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-33576
>                 URL: https://issues.apache.org/jira/browse/SPARK-33576
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.1
>         Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>            Reporter: Darshat
>            Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff0000}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff0000}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff0000}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to