[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995815#comment-16995815 ]
Nasir Ali edited comment on SPARK-28502 at 12/13/19 6:45 PM: ------------------------------------------------------------- {code:java} import numpy as np import pandas as pd import json from geopy.distance import great_circle from pyspark.sql.functions import pandas_udf, PandasUDFType from shapely.geometry.multipoint import MultiPoint from sklearn.cluster import DBSCAN from pyspark.sql.types import StructField, StructType, StringType, FloatType, MapType from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([ StructField("timestamp", TimestampType()), StructField("window", StructType([ StructField("start", TimestampType()), StructField("end", TimestampType())])), StructField("some_val", StringType()) ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def get_win_col(key, user_data): all_vals = [] for index, row in user_data.iterrows(): all_vals.append([row["timestamp"],key[2],"tesss"]) return pd.DataFrame(all_vals,columns=['timestamp','window','some_val']) {code} I am not even able to manually return window column. It throws error {code:java} Traceback (most recent call last): File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in returnType to_arrow_type(self._returnType_placeholder) File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, in to_arrow_type raise TypeError("Nested StructType not supported in conversion to Arrow") TypeError: Nested StructType not supported in conversion to ArrowDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in _create_udf return udf_obj._wrapped() File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in _wrapped wrapper.returnType = self.returnType File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in returnType "%s is not supported" % str(self._returnType_placeholder)) NotImplementedError: Invalid returnType with grouped map Pandas UDFs: StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true))) is not supported {code} However, if I manually run *to_arrow_schema(schema)*. It works all fine and there is no exception. [https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L139] {code:java} from pyspark.sql.types import to_arrow_schema to_arrow_schema(schema) {code} was (Author: nasirali): {code:java} import numpy as np import pandas as pd import json from geopy.distance import great_circle from pyspark.sql.functions import pandas_udf, PandasUDFType from shapely.geometry.multipoint import MultiPoint from sklearn.cluster import DBSCAN from pyspark.sql.types import StructField, StructType, StringType, FloatType, MapType from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([ StructField("timestamp", TimestampType()), StructField("window", StructType([ StructField("start", TimestampType()), StructField("end", TimestampType())])), StructField("some_val", StringType()) ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def get_win_col(key, user_data): all_vals = [] for index, row in user_data.iterrows(): all_vals.append([row["timestamp"],key[2],"tesss"]) return pd.DataFrame(all_vals,columns=['timestamp','window','some_val']) {code} I am not even able to manually return window column. It throws error {code:java} Traceback (most recent call last): File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in returnType to_arrow_type(self._returnType_placeholder) File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, in to_arrow_type raise TypeError("Nested StructType not supported in conversion to Arrow") TypeError: Nested StructType not supported in conversion to ArrowDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in _create_udf return udf_obj._wrapped() File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in _wrapped wrapper.returnType = self.returnType File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in returnType "%s is not supported" % str(self._returnType_placeholder)) NotImplementedError: Invalid returnType with grouped map Pandas UDFs: StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true))) is not supported {code} However, if I manually run *to_arrow_schema(schema)*. It works all fine and there is no exception. {code:java} from pyspark.sql.types import to_arrow_schema to_arrow_schema(schema) {code} > Error with struct conversion while using pandas_udf > --------------------------------------------------- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 > Reporter: Nasir Ali > Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], > ["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct<start: > timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-----+------------------------------------------+-------+ > |id |window |avg(id)| > +-----+------------------------------------------+-------+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-----+------------------------------------------+-------+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org