[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-12-11 Thread Bryan Cutler (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17248064#comment-17248064
 ] 

Bryan Cutler commented on SPARK-33576:
--

[~darshats] I believe the only current workaround is to further split your 
groups with other keys to get under the 2GB limit. To take advantage of the new 
Arrow improvements for this would most likely require some work on the Spark 
side, but I'd have to look into it more.

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-12-02 Thread Darshat (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242493#comment-17242493
 ] 

Darshat commented on SPARK-33576:
-

Thanks [~bryanc] , yes this does look similar if not the same issue. With the 
arrow fixes so far, is there a workaround for it?

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-12-01 Thread Bryan Cutler (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241769#comment-17241769
 ] 

Bryan Cutler commented on SPARK-33576:
--

Is this due to the 2GB limit? As in 
https://issues.apache.org/jira/browse/SPARK-32294 and 
https://issues.apache.org/jira/browse/ARROW-4890

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-12-01 Thread Darshat (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241683#comment-17241683
 ] 

Darshat commented on SPARK-33576:
-

I set the following options to see if they'd help, but still get the same error:

 

spark.sql.execution.arrow.maxRecordsPerBatch 100

spark.sql.pyspark.jvmStacktrace.enabled true

spark.driver.maxResultSize 18g

spark.reducer.maxSizeInFlight 1024m

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-12-01 Thread Darshat (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241342#comment-17241342
 ] 

Darshat commented on SPARK-33576:
-

I turned on the jvm exception logging, and this is the full trace:

JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 1 in stage 58.0 failed 4 times, most recent failure: Lost task 
1.3 in stage 58.0 (TID 3336, 10.139.64.5, executor 7): 
org.apache.spark.api.python.PythonException: 'OSError: Invalid IPC message: 
negative bodyLength'. Full traceback below: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 654, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 646, in process 
serializer.dump_stream(out_iter, outfile) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
dump_stream for batch in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
init_stream_yield_batches for series in iterator: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
load_stream for batch in batches: File 
"/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in __iter__ 
File "pyarrow/ipc.pxi", line 432, in 
pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 
99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
bodyLength at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.agg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
 at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144) at 
org.apache.spark.scheduler.Task.run(Task.scala:117) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:660)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
 at scala.Option.foreach(Option.scala:407) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
 at 

[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-11-30 Thread Darshat (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240896#comment-17240896
 ] 

Darshat commented on SPARK-33576:
-

Hi Hyukjin,

Thanks for your reply. 

I cannot repro on a smaller set. But it is a consistent repro on the larger 
data. For example if the input data frame has 3 million rows, it does not 
occur. Also the data set is partitioned on the group by column (providerid) 
before the group by call. So data for each providerid is on one node.

I have tried setting toggling the arrow optimization setting, but it happens 
whether the setting is on or off.



My suspicion is somehow the data transfer to the driver is not complete. Could 
it be a timeout in receiving the grouped results? Any spark settings we could 
try to change?

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33576) PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC message: negative bodyLength'.

2020-11-29 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240451#comment-17240451
 ] 

Hyukjin Kwon commented on SPARK-33576:
--

- Are you able to provide the full reproducer with smaller data?
- Does that happen consistently which ever you code run that uses pandas / 
Arrow?
- If this is indeterministically reproduced, Is it dependent on the codes or 
data?

> PythonException: An exception was thrown from a UDF: 'OSError: Invalid IPC 
> message: negative bodyLength'.
> -
>
> Key: SPARK-33576
> URL: https://issues.apache.org/jira/browse/SPARK-33576
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: Databricks runtime 7.3
> Spakr 3.0.1
> Scala 2.12
>Reporter: Darshat
>Priority: Major
>
> Hello,
> We are using Databricks on Azure to process large amount of ecommerce data. 
> Databricks runtime is 7.3 which includes Apache spark 3.0.1 and Scala 2.12.
> During processing, there is a groupby operation on the DataFrame that 
> consistently gets an exception of this type:
>  
> {color:#ff}PythonException: An exception was thrown from a UDF: 'OSError: 
> Invalid IPC message: negative bodyLength'. Full traceback below: Traceback 
> (most recent call last): File "/databricks/spark/python/pyspark/worker.py", 
> line 654, in main process() File 
> "/databricks/spark/python/pyspark/worker.py", line 646, in process 
> serializer.dump_stream(out_iter, outfile) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 281, in 
> dump_stream timely_flush_timeout_ms=self.timely_flush_timeout_ms) File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 97, in 
> dump_stream for batch in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 271, in 
> init_stream_yield_batches for series in iterator: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 287, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 228, in 
> load_stream for batch in batches: File 
> "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in 
> load_stream for batch in reader: File "pyarrow/ipc.pxi", line 412, in 
> __iter__ File "pyarrow/ipc.pxi", line 432, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch File "pyarrow/error.pxi", 
> line 99, in pyarrow.lib.check_status OSError: Invalid IPC message: negative 
> bodyLength{color}
>  
> Code that causes this:
> {color:#ff}x = df.groupby('providerid').apply(domain_features){color}
> {color:#ff}display(x.info()){color}
> Dataframe size - 22 million rows, 31 columns
>  One of the columns is a string ('providerid') on which we do a groupby 
> followed by an apply  operation. There are 3 distinct provider ids in this 
> set. While trying to enumerate/count the results, we get this exception.
> We've put all possible checks in the code for null values, or corrupt data 
> and we are not able to track this to application level code. I hope we can 
> get some help troubleshooting this as this is a blocker for rolling out at 
> scale.
> The cluster has 8 nodes + driver, all 28GB RAM. I can provide any other 
> settings that could be useful. 
>  Hope to get some insights into the problem. 
> Thanks,
> Darshat Shah



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org