[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937233#comment-16937233 ]
Bryan Cutler commented on SPARK-28502: -------------------------------------- I was able to reproduce in Spark 2.4.3. The problem was introducing a window in a PandasUDFType.GROUPED_MAP adds an Arrow column that is a StructType to the UDF. From the above example it is: {noformat} pyarrow.Table window: struct<start: timestamp[us, tz=America/Los_Angeles], end: timestamp[us, tz=America/Los_Angeles]> not null child 0, start: timestamp[us, tz=America/Los_Angeles] child 1, end: timestamp[us, tz=America/Los_Angeles] id: double ts: timestamp[us, tz=America/Los_Angeles] {noformat} And Spark 2.4.3 is not able to handle this column. In current master, it seems to work and the column is treated as part of the key. The example above will just ignore it by default, but you can get it as part of th e UDF input like so: {code} @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(key, df): print("Key: {}".format(key)) print("DF:\n{}".format(df)) # some computation return df {code} Would print out for ts 13.0: {noformat} Key: (13.0, {'start': datetime.datetime(2018, 3, 5, 0, 0), 'end': datetime.datetime(2018, 3, 20, 0, 0)}) DF: id ts 0 13.0 2018-03-11 05:27:18 {noformat} Someone should verify this is the correct way to handle it and the result is right. Can you do this [~blakec] or [~nasirali] ? If so, we can close this but should follow up and add proper testing. > Error with struct conversion while using pandas_udf > --------------------------------------------------- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 > Reporter: Nasir Ali > Priority: Minor > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], > ["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct<start: > timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-----+------------------------------------------+-------+ > |id |window |avg(id)| > +-----+------------------------------------------+-------+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-----+------------------------------------------+-------+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org