[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489972#comment-16489972 ] Cristian Consonni commented on SPARK-23929: --- This bug was referenced in [issue SPARK-24324|https://issues.apache.org/jira/browse/SPARK-24324], which is a duplicate of this one. See also pull request [#21427|https://github.com/apache/spark/pull/21427]. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453880#comment-16453880 ] Tr3wory commented on SPARK-23929: - Yes, the documentation is a must, even for 2.3 if possible (it's a new feature of 2.3, right?). > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453646#comment-16453646 ] Hyukjin Kwon commented on SPARK-23929: -- How about we document the current behaviour for now and then we start a discussion in mailing list when we are close to 3.0.0? I think we already started to talk about Spark 3.0.0. So, we fix the documentation first here (for 2.4.0 maybe) and it'd be good to start the discussion about this again when we are clear on 3.0.0. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452459#comment-16452459 ] Tr3wory commented on SPARK-23929: - Yes, but that's not simpler than using "columns=[...]": {code:python} from collections import OrderedDict @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def constants(grp): return pd.DataFrame(OrderedDict([("id",grp.iloc[0]['id']), ("zeros",0), ("ones",1)]),index = [0]) df.groupby("id").apply(constants).show() {code} > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452355#comment-16452355 ] Li Jin commented on SPARK-23929: [~tr3w] does using OrderedDict help in your case? > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452282#comment-16452282 ] Tr3wory commented on SPARK-23929: - I think the problem is even more nuanced: in python the order of the values in a dict are not defined, so generally if you create a new DataFrame in your function from a dict (like in the previous example) you need to specify the order manually. So {code} @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def constants(grp): return pd.DataFrame({"id":grp.iloc[0]['id'], "ones":1, "zeros":0},index = [0]) df.groupby("id").apply(constants).show() {code} and {code} @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def constants(grp): return pd.DataFrame({"id":grp.iloc[0]['id'], "zeros":0, "ones":1},index = [0]) df.groupby("id").apply(constants).show() {code} gives you the exact same wrong results. The fact that this happens without any error or warning is even more worrying (but only if the types are compatible, if any of them a string, you get strange errors). You need to specify the order manually: {code} @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def constants(grp): return pd.DataFrame({"id":grp.iloc[0]['id'], "zeros":0, "ones":1}, columns=['id', 'zeros', 'ones'], index=[0]) df.groupby("id").apply(constants).show() {code} I think the current implementation is hard to use correctly and very easy to use incorrectly which greatly outweighs the small flexibility of not specify the names... > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438935#comment-16438935 ] Li Jin commented on SPARK-23929: I agree with [~hyukjin.kwon]. Seems like there is not a strong enough reason to change the behavior. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432344#comment-16432344 ] Omri commented on SPARK-23929: -- [~icexelloss], I couldn't recreate the problem I had where the order was mixed, but I have a different example to illustrate the problem. Here the schema struct is [id,zeros,ones] but the user returned a data frame with [id,ones,zeros]. The column names are taken from the provided schema and not from the explicitly mentioned data frame. {code:java} from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import * import pandas as pd df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) schema = StructType([ StructField("id", LongType()), StructField("zeros", DoubleType()), StructField("ones", DoubleType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def median_per_group(grp): return pd.DataFrame({"id":grp.iloc[0]['id'],"ones":1,"zeros":0},index = [0]) df.groupby("id").apply(median_per_group).show() {code} results: {code:java} +---+-++ | id|zeros|ones| +---+-++ | 1| 1.0| 0.0| | 2| 1.0| 0.0| +---+-++ {code} So the user was expecting ones to have 1 and zeros to have 0 but they get wrong results. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431585#comment-16431585 ] Hyukjin Kwon commented on SPARK-23929: -- Unless there's a strong reason for going ahead with mapping by name, I would propose to fix the documentation for now. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431584#comment-16431584 ] Hyukjin Kwon commented on SPARK-23929: -- I think we already use positional-based approach and released out .. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431188#comment-16431188 ] Li Jin commented on SPARK-23929: I think there are pros and cons for both matching by position and by name. Match by position give the user the flexibility of not needing to spell out column names in the udf. e.g. {code:java} @pandas_udf("id long, v double, v1 double", PandasUDFType.GROUPED_MAP) def normalize(pdf): id = pdf.id vs = # return pd.DataFrame([id + vs]) {code} Match by name give the user the flexibility of reorder columns. Admittedly, the choice is somewhat arbitrary now. But I am also not sure if one is strictly better. [~omri374] in what case would you have out of order return value in your UDF? I am trying to see if that's more common. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431014#comment-16431014 ] Bryan Cutler commented on SPARK-23929: -- cc [~icexelloss] > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430321#comment-16430321 ] Omri commented on SPARK-23929: -- [~hyukjin.kwon] I can do a PR, however I think this should only be a temporary solution. As you can see in the example, the results are quite different. When working on a real life scenario, I found that it actually worked when the order was the opposite. so the struct was "id long, v double" and the returned data frame was "v double, id long". Flipping it caused a mismatch of the types. If you're still interested in a PR for documentation, please let me know. > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name
[ https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430169#comment-16430169 ] Hyukjin Kwon commented on SPARK-23929: -- I think documentation would be good enough for now. Would you be interested in a PR? > pandas_udf schema mapped by position and not by name > > > Key: SPARK-23929 > URL: https://issues.apache.org/jira/browse/SPARK-23929 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 > Environment: PySpark > Spark 2.3.0 > >Reporter: Omri >Priority: Major > > The return struct of a pandas_udf should be mapped to the provided schema by > name. Currently it's not the case. > Consider these two examples, where the only change is the order of the fields > in the provided schema struct: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > and this one: > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP) > def normalize(pdf): > v = pdf.v > return pdf.assign(v=(v - v.mean()) / v.std()) > df.groupby("id").apply(normalize).show() > {code} > The results should be the same but they are different: > For the first code: > {code:java} > +---+---+ > | v| id| > +---+---+ > |1.0| 0| > |1.0| 0| > |2.0| 0| > |2.0| 0| > |2.0| 1| > +---+---+ > {code} > For the second code: > {code:java} > +---+---+ > | id| v| > +---+---+ > | 1|-0.7071067811865475| > | 1| 0.7071067811865475| > | 2|-0.8320502943378437| > | 2|-0.2773500981126146| > | 2| 1.1094003924504583| > +---+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org