Nasir Ali created SPARK-28502:
---------------------------------

             Summary: Error with struct conversion while using pandas_udf
                 Key: SPARK-28502
                 URL: https://issues.apache.org/jira/browse/SPARK-28502
             Project: Spark
          Issue Type: Question
          Components: PySpark
    Affects Versions: 2.4.3
         Environment: OS: Ubuntu

Python: 3.6
            Reporter: Nasir Ali


What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I am new 
to pyspark. I don't know if there is a better/cleaner solution to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
                            (13.00, "2018-03-11T12:27:18+00:00"),
                            (25.00, "2018-03-12T11:27:18+00:00"),
                            (20.00, "2018-03-13T15:27:18+00:00"),
                            (17.00, "2018-03-14T12:27:18+00:00"),
                            (99.00, "2018-03-15T11:27:18+00:00"),
                            (156.00, "2018-03-22T11:27:18+00:00"),
                            (17.00, "2018-03-31T11:27:18+00:00"),
                            (25.00, "2018-03-15T11:27:18+00:00"),
                            (25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
    StructField("id", IntegerType()),
    StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
    # some computation
    return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct<start: 
timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-----+------------------------------------------+-------+
|id   |window                                    |avg(id)|
+-----+------------------------------------------+-------+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-----+------------------------------------------+-------+
{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to