`BucketTransform` is a builtin partition transform in Spark, instead of a UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to represent its bucket transform, or use the Spark builtin `BucketTransform`? I'm asking this because other v2 sources may also use the builtin `BucketTransform` but use a different bucket hash function. Or we can clearly define the bucket hash function of the builtin `BucketTransform` in the doc.
On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote: > Two v2 sources may return different bucket IDs for the same value, and > this breaks the phase 1 split-wise join. > > This is why the FunctionCatalog included a canonicalName method (docs > <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>). > That method returns an identifier that can be used to compare whether two > bucket function instances are the same. > > > 1. Can we apply this idea to partitioned file source tables > (non-bucketed) as well? > > What do you mean here? The design doc discusses transforms like days(ts) > that can be supported in the future. Is that what you’re asking about? Or > are you referring to v1 file sources? I think the goal is to support v2, > since v1 doesn’t have reliable behavior. > > Note that the initial implementation goal is to support bucketing since > that’s an easier case because both sides have the same number of > partitions. More complex storage-partitioned joins can be implemented later. > > > 1. What if the table has many partitions? Shall we apply certain join > algorithms in the phase 1 split-wise join as well? Or even launch a Spark > job to do so? > > I think that this proposal opens up a lot of possibilities, like what > you’re suggesting here. It is a bit like AQE. We’ll need to come up with > heuristics for choosing how and when to use storage partitioning in joins. > As I said above, bucketing is a great way to get started because it fills > an existing gap. More complex use cases can be supported over time. > > Ryan > > On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote: > >> IIUC, the general idea is to let each input split report its partition >> value, and Spark can perform the join in two phases: >> 1. join the input splits from left and right tables according to their >> partitions values and join keys, at the driver side. >> 2. for each joined input splits pair (or a group of splits), launch a >> Spark task to join the rows. >> >> My major concern is about how to define "compatible partitions". Things >> like `days(ts)` are straightforward: the same timestamp value always >> results in the same partition value, in whatever v2 sources. `bucket(col, >> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2 >> sources may return different bucket IDs for the same value, and this breaks >> the phase 1 split-wise join. >> >> And two questions for further improvements: >> 1. Can we apply this idea to partitioned file source tables >> (non-bucketed) as well? >> 2. What if the table has many partitions? Shall we apply certain join >> algorithms in the phase 1 split-wise join as well? Or even launch a Spark >> job to do so? >> >> Thanks, >> Wenchen >> >> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote: >> >>> Thanks Cheng for the comments. >>> >>> > Is migrating Hive table read path to data source v2, being a >>> prerequisite of this SPIP >>> >>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if >>> Hive eventually moves to use V2 API. With that said, I think some of the >>> ideas could be useful for V1 Hive support as well. For instance, with the >>> newly proposed logic to compare whether output partitionings from both >>> sides of a join operator are compatible, we can have HiveTableScanExec to >>> report a different partitioning other than HashPartitioning, and >>> EnsureRequirements could potentially recognize that and therefore avoid >>> shuffle if both sides report the same compatible partitioning. In addition, >>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes >>> the constraint for V1 bucket join so that the join keys do not necessarily >>> be identical to the bucket keys. >>> >>> > Would aggregate work automatically after the SPIP? >>> >>> Yes it will work as before. This case is already supported by >>> DataSourcePartitioning in V2 (see SPARK-22389). >>> >>> > Any major use cases in mind except Hive bucketed table? >>> >>> Our first use case is Apache Iceberg. In addition to that we also want >>> to add the support for Spark's built-in file data sources. >>> >>> Thanks, >>> Chao >>> >>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote: >>> >>>> +1 for this. This is exciting movement to efficiently read bucketed >>>> table from other systems (Hive, Trino & Presto)! >>>> >>>> >>>> >>>> Still looking at the details but having some early questions: >>>> >>>> >>>> >>>> 1. Is migrating Hive table read path to data source v2, being a >>>> prerequisite of this SPIP? >>>> >>>> >>>> >>>> Hive table read path is currently a mix of data source v1 (for Parquet >>>> & ORC file format only), and legacy Hive code path (HiveTableScanExec). In >>>> the SPIP, I am seeing we only make change for data source v2, so wondering >>>> how this would work with existing Hive table read path. In addition, just >>>> FYI, supporting writing Hive bucketed table is merged in master recently ( >>>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has >>>> details). >>>> >>>> >>>> >>>> 1. Would aggregate work automatically after the SPIP? >>>> >>>> >>>> >>>> Another major benefit for having bucketed table, is to avoid shuffle >>>> before aggregate. Just want to bring to our attention that it would be >>>> great to consider aggregate as well when doing this proposal. >>>> >>>> >>>> >>>> 1. Any major use cases in mind except Hive bucketed table? >>>> >>>> >>>> >>>> Just curious if there’s any other use cases we are targeting as part of >>>> SPIP. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Cheng Su >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *From: *Ryan Blue <b...@apache.org> >>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM >>>> *To: *John Zhuge <jzh...@apache.org> >>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>, >>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun < >>>> dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, Wenchen >>>> Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, dev < >>>> dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com> >>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data >>>> Source V2 >>>> >>>> Instead of commenting on the doc, could we keep discussion here on the >>>> dev list please? That way more people can follow it and there is more room >>>> for discussion. Comment threads have a very small area and easily become >>>> hard to follow. >>>> >>>> >>>> >>>> Ryan >>>> >>>> >>>> >>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote: >>>> >>>> +1 Nicely done! >>>> >>>> >>>> >>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote: >>>> >>>> Oops, sorry. I just fixed the permission setting. >>>> >>>> >>>> >>>> Thanks everyone for the positive support! >>>> >>>> >>>> >>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> >>>> wrote: >>>> >>>> +1 to this SPIP and nice writeup of the design doc! >>>> >>>> >>>> >>>> Can we open comment permission in the doc so that we can discuss >>>> details there? >>>> >>>> >>>> >>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com> >>>> wrote: >>>> >>>> Seems making sense to me. >>>> >>>> Would be great to have some feedback from people such as @Wenchen Fan >>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu >>>> <angers....@gmail.com>. >>>> >>>> >>>> >>>> >>>> >>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com> >>>> wrote: >>>> >>>> +1 for this SPIP. >>>> >>>> >>>> >>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com> >>>> wrote: >>>> >>>> +1. Thanks for lifting the current restrictions on bucket join and >>>> making this more generalized. >>>> >>>> >>>> >>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote: >>>> >>>> +1 from me as well. Thanks Chao for doing so much to get it to this >>>> point! >>>> >>>> >>>> >>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote: >>>> >>>> +1 on this SPIP. >>>> >>>> This is a more generalized version of bucketed tables and bucketed >>>> joins which can eliminate very expensive data shuffles when joins, and >>>> many users in the Apache Spark community have wanted this feature for >>>> a long time! >>>> >>>> Thank you, Ryan and Chao, for working on this, and I look forward to >>>> it as a new feature in Spark 3.3 >>>> >>>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >>>> >>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote: >>>> > >>>> > Hi, >>>> > >>>> > Ryan and I drafted a design doc to support a new type of join: >>>> storage partitioned join which covers bucket join support for DataSourceV2 >>>> but is more general. The goal is to let Spark leverage distribution >>>> properties reported by data sources and eliminate shuffle whenever >>>> possible. >>>> > >>>> > Design doc: >>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >>>> (includes a POC link at the end) >>>> > >>>> > We'd like to start a discussion on the doc and any feedback is >>>> welcome! >>>> > >>>> > Thanks, >>>> > Chao >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Ryan Blue >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> John Zhuge >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Ryan Blue >>>> >>> > > -- > Ryan Blue >