[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251632#comment-17251632
 ] 

Dmitry Kravchuk commented on ARROW-4890:
----------------------------------------

Finally I've tried pandas_udf with 1.72 gb dataset, pyarrow version 2.0.0 and 
ARROW_PRE_0_15_IPC_FORMAT=1 prompt - spark-submit returns error "OSError: 
Invalid IPC message: negative bodyLength" (look at detail log number 4 in my 
previous messages).

Any thoughts?

 
{code:java|title=Bar.Python|borderStyle=solid}
import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

def analyze(spark):

    pdf1 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
        columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
    )
    df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
range(429)]).reset_index()).drop('index')

    pdf2 = pd.DataFrame(
        [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
"abcdefghijklmno"]],
        columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
    )
    df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
range(48993)]).reset_index()).drop('index')
    df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

    def myudf(df):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        return df

    df4 = df3 \
        .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
        .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
        .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
        .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
        .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
        .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
        .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
        .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
        .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
        .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
    print(df4.printSchema())

    udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

    df5 = df4.groupBy('df1_c1').apply(udf)
    print('df5.count()', df5.count())
{code}
 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -----------------------------------------------------------------------------
>
>                 Key: ARROW-4890
>                 URL: https://issues.apache.org/jira/browse/ARROW-4890
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.8.0
>         Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>            Reporter: Abdeali Kothari
>            Priority: Major
>         Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>       columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>       columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
>     return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to