[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865433#comment-16865433 ] Chris Martin commented on SPARK-27463: -- sounds good to me too. > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863567#comment-16863567 ] Hyukjin Kwon commented on SPARK-27463: -- Yea I think it'd be easier to discuss about this with a Pr > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16863177#comment-16863177 ] Li Jin commented on SPARK-27463: Yeah I think the exact spelling of the API can go either way. I think the current options are pretty close that we don't need to commit to either one at this moment and they shouldn't affect the implementation too much. [~d80tb7] [~hyukjin.kwon] How about we start working towards a PR that implements one of the proposed APIs and go from there? > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862828#comment-16862828 ] Chris Martin commented on SPARK-27463: -- Hi [~hyukjin.kwon] Ah I see your concern now. It think it’s fair to say that the cogrouping functionality proposed has no analogous API in Pandas. In my opinion that’s understandable as Pandas is fundamentally a library for manipulating local data so the problems of colocating multiple DatafFrames don’t apply as they do in Spark. That said, the inspiration behind the proposed API is clearly that of the Pandas groupby().apply() so I’d argue it is not without precedent. I think the more direct comparison here is with the existing Dataset cogroup, where high level functionality is almost exactly the same (partition two distinct DatafFrames such that partitions are cogroup and apply a flatmap operation over them) with the differences being in the cogroup key definition (typed for datasets, untyped for pandas-udf), Input (Iterables for Datasets, Pandas DataFrames for pandas-udf) and Output (Iterable for Datasets, pandas Dataframe for pandas-udf). Now at this point one might observe that we have two different language-specific implementations of the same high level functionality. This is true, however it’s been the case since the introduction of Pandas Udfs (see groupBy().apply() vs groupByKey().flatmapgroups()) and is imho a good thing; it allows us to provide functionality that plays to the strength of each individual language given that what is simple and idiomatic in Python is not in Scala and vice versa. If, considering this, we agree that this cogroup functionality both useful and suitable as exposing via a Pandas UDF (and I hope we do, but please say if you disagree), the question now comes as to what we would like the api to be. At this point let’s consider the API as currently proposed in the design doc. {code:java} result = df1.cogroup(df2, on='id').apply(my_pandas_udf) {code} This API is concise and consistent with existing groupby.apply(). The disadvantage is that it isn’t consistent with Dataset’s cogroup and, as this API doesn’t exist in Pandas, it can’t be consistent with that (although I would argue that if Pandas did introduce such an API it would look a lot like this). The alternative would be to implement something on RelationalGroupedData as described by Li in the post above (I think we can discount something based on KeyValueGroupedDataset as if my reading of the code is correct this would only apply for typed APIs which this isn’t). The big advantage here is that this is much more consistent with the existing Dataset cogroup. On the flip side it comes at the cost of a little more verbosity and IMHO is a little less pythonic/in the style of Pandas. That being the case, I’m slightly in favour of the the API as currently proposed in the design doc, but am happy to be swayed to something else if the majority have a different opinion. > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862287#comment-16862287 ] Li Jin commented on SPARK-27463: I think one way to design this API to mimic the existing dataset cogroup API: {code:java} gdf1 = df1.groupByKey('id') gdf2 = df2.groupByKey('id') result = gdf1.cogroup(gdf2).apply(my_pandas_udf){code} Although the KeyValueGroupedData and groupByKey isn't really exposed to pyspark (or maybe it doesn't apply to pyspark because of type?) So another way to go about this is to use RelationalGroupedData: {code:java} gdf1 = df1.groupBy('id') gdf2 = df2.groupBy('id') result = gdf1.cogroup(gdf2).apply(my_pandas_udf){code} > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862225#comment-16862225 ] Li Jin commented on SPARK-27463: For cogroup, I don't think there is analogous API in pandas. There is analogous Spark Scala API in KeyValueGroupedDataset: {code:java} val gdf1 = df1.groupByKey('id') val gdf2 = df2.groupByKey('id') val result = gdf1.cogroup(gdf2)(my_scala_udf){code} > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861999#comment-16861999 ] Hyukjin Kwon commented on SPARK-27463: -- It's easier and safer to find a reference to justify new API and avoid to implement a API from scratch. I think usually our Pandas UDF APIs mimic Pandas' or borrow some idea from there (e.g., groupby().apply(...)), and then make it distinct within PySpark. There are some other examples that works just like other PySpark (or Scala side Spark) APIs too (e.g., Windows Pandas UDF). > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861995#comment-16861995 ] Chris Martin commented on SPARK-27463: -- Also my assumption is that the most difficult part of this is extending the udf functionality such that multiple DataFrames can be passed as arguments to a given udf. I have a fairly rough design proposal for how this might be achieved. Once this has been refined slightly I'll post it up so that people can comment. > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861993#comment-16861993 ] Chris Martin commented on SPARK-27463: -- Hi [~hyukjin.kwon]- I've just started working on the code side of this (as an aside I seem unable to assign this Jira to me- do you know how I can do this?). Regarding your questions- I don't think there is an analogous API in pandas although perhaps [~icexelloss] knows of one. In terms of comparison to the Dataset Cogroup there are obviously a number of similarities but the biggest difference is that the Scala version you end up operating on a couple of Scala Iterators whereas in this proposal you would operate on a couple of Pandas DataFrames. This means that the Scala version doesn't necessarily need to be able to store the entire cogroup in memory, but on the other hand gives you a much less rich data structure (a Scala iterator as opposed to a Pandas DataFrame). I think this distinction is basically analogous to that between the Python groupby().apply() and the Scala groupbyKey().flatmapgroups(). In each case you end up operating on a data structure which is more in keeping with the language at hand. > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861749#comment-16861749 ] Hyukjin Kwon commented on SPARK-27463: -- BTW, I think we have cogroup at Dataset in Scala side. How is it different from that? > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27463) Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861747#comment-16861747 ] Hyukjin Kwon commented on SPARK-27463: -- [~d80tb7] are you still working on this? If there are some API references in Pandas, I think we can just mimic it. If so, can you just open a PR? cc [~icexelloss] as well. > Support Dataframe Cogroup via Pandas UDFs > -- > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org