Thanks all for your inputs here!
Seems the discussion already settles, I will be the shepherd for the SPIP and call for a vote on the SPIP moving forward in a new thread. On 2021/10/28 13:05:53, Wenchen Fan <cloud0...@gmail.com> wrote: > Thanks for the explanation! It makes sense to always resolve the logical > transforms to concrete implementations, and check the concrete > implementations to decide compatible partitions. We can discuss more > details in the PR later. > > On Thu, Oct 28, 2021 at 4:14 AM Ryan Blue <b...@apache.org> wrote: > > > The transform expressions in v2 are logical, not concrete implementations. > > Even days may have different implementations -- the only expectation is > > that the partitions are day-sized. For example, you could use a transform > > that splits days at UTC 00:00, or uses some other day boundary. > > > > Because the expressions are logical, we need to resolve them to > > implementations at some point, like Chao outlines. We can do that using a > > FunctionCatalog, although I think it's worth considering adding an > > interface so that a transform from a Table can be converted into a > > `BoundFunction` directly. That is easier than defining a way for Spark to > > query the function catalog. > > > > In any case, I'm sure it's easy to understand how this works once you get > > a concrete implementation. > > > > On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan <cloud0...@gmail.com> wrote: > > > >> `BucketTransform` is a builtin partition transform in Spark, instead of a > >> UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to > >> represent its bucket transform, or use the Spark builtin `BucketTransform`? > >> I'm asking this because other v2 sources may also use the builtin > >> `BucketTransform` but use a different bucket hash function. Or we can > >> clearly define the bucket hash function of the builtin `BucketTransform` in > >> the doc. > >> > >> On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote: > >> > >>> Two v2 sources may return different bucket IDs for the same value, and > >>> this breaks the phase 1 split-wise join. > >>> > >>> This is why the FunctionCatalog included a canonicalName method (docs > >>> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>). > >>> That method returns an identifier that can be used to compare whether two > >>> bucket function instances are the same. > >>> > >>> > >>> 1. Can we apply this idea to partitioned file source tables > >>> (non-bucketed) as well? > >>> > >>> What do you mean here? The design doc discusses transforms like days(ts) > >>> that can be supported in the future. Is that what you’re asking about? Or > >>> are you referring to v1 file sources? I think the goal is to support v2, > >>> since v1 doesn’t have reliable behavior. > >>> > >>> Note that the initial implementation goal is to support bucketing since > >>> that’s an easier case because both sides have the same number of > >>> partitions. More complex storage-partitioned joins can be implemented > >>> later. > >>> > >>> > >>> 1. What if the table has many partitions? Shall we apply certain > >>> join algorithms in the phase 1 split-wise join as well? Or even launch > >>> a > >>> Spark job to do so? > >>> > >>> I think that this proposal opens up a lot of possibilities, like what > >>> you’re suggesting here. It is a bit like AQE. We’ll need to come up with > >>> heuristics for choosing how and when to use storage partitioning in joins. > >>> As I said above, bucketing is a great way to get started because it fills > >>> an existing gap. More complex use cases can be supported over time. > >>> > >>> Ryan > >>> > >>> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote: > >>> > >>>> IIUC, the general idea is to let each input split report its partition > >>>> value, and Spark can perform the join in two phases: > >>>> 1. join the input splits from left and right tables according to their > >>>> partitions values and join keys, at the driver side. > >>>> 2. for each joined input splits pair (or a group of splits), launch a > >>>> Spark task to join the rows. > >>>> > >>>> My major concern is about how to define "compatible partitions". Things > >>>> like `days(ts)` are straightforward: the same timestamp value always > >>>> results in the same partition value, in whatever v2 sources. `bucket(col, > >>>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2 > >>>> sources may return different bucket IDs for the same value, and this > >>>> breaks > >>>> the phase 1 split-wise join. > >>>> > >>>> And two questions for further improvements: > >>>> 1. Can we apply this idea to partitioned file source tables > >>>> (non-bucketed) as well? > >>>> 2. What if the table has many partitions? Shall we apply certain join > >>>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark > >>>> job to do so? > >>>> > >>>> Thanks, > >>>> Wenchen > >>>> > >>>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote: > >>>> > >>>>> Thanks Cheng for the comments. > >>>>> > >>>>> > Is migrating Hive table read path to data source v2, being a > >>>>> prerequisite of this SPIP > >>>>> > >>>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if > >>>>> Hive eventually moves to use V2 API. With that said, I think some of the > >>>>> ideas could be useful for V1 Hive support as well. For instance, with > >>>>> the > >>>>> newly proposed logic to compare whether output partitionings from both > >>>>> sides of a join operator are compatible, we can have HiveTableScanExec > >>>>> to > >>>>> report a different partitioning other than HashPartitioning, and > >>>>> EnsureRequirements could potentially recognize that and therefore avoid > >>>>> shuffle if both sides report the same compatible partitioning. In > >>>>> addition, > >>>>> SPARK-35703, which is part of the SPIP, is also useful in that it > >>>>> relaxes > >>>>> the constraint for V1 bucket join so that the join keys do not > >>>>> necessarily > >>>>> be identical to the bucket keys. > >>>>> > >>>>> > Would aggregate work automatically after the SPIP? > >>>>> > >>>>> Yes it will work as before. This case is already supported by > >>>>> DataSourcePartitioning in V2 (see SPARK-22389). > >>>>> > >>>>> > Any major use cases in mind except Hive bucketed table? > >>>>> > >>>>> Our first use case is Apache Iceberg. In addition to that we also want > >>>>> to add the support for Spark's built-in file data sources. > >>>>> > >>>>> Thanks, > >>>>> Chao > >>>>> > >>>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote: > >>>>> > >>>>>> +1 for this. This is exciting movement to efficiently read bucketed > >>>>>> table from other systems (Hive, Trino & Presto)! > >>>>>> > >>>>>> > >>>>>> > >>>>>> Still looking at the details but having some early questions: > >>>>>> > >>>>>> > >>>>>> > >>>>>> 1. Is migrating Hive table read path to data source v2, being a > >>>>>> prerequisite of this SPIP? > >>>>>> > >>>>>> > >>>>>> > >>>>>> Hive table read path is currently a mix of data source v1 (for > >>>>>> Parquet & ORC file format only), and legacy Hive code path > >>>>>> (HiveTableScanExec). In the SPIP, I am seeing we only make change for > >>>>>> data > >>>>>> source v2, so wondering how this would work with existing Hive table > >>>>>> read > >>>>>> path. In addition, just FYI, supporting writing Hive bucketed table is > >>>>>> merged in master recently (SPARK-19256 > >>>>>> <https://issues.apache.org/jira/browse/SPARK-19256> has details). > >>>>>> > >>>>>> > >>>>>> > >>>>>> 1. Would aggregate work automatically after the SPIP? > >>>>>> > >>>>>> > >>>>>> > >>>>>> Another major benefit for having bucketed table, is to avoid shuffle > >>>>>> before aggregate. Just want to bring to our attention that it would be > >>>>>> great to consider aggregate as well when doing this proposal. > >>>>>> > >>>>>> > >>>>>> > >>>>>> 1. Any major use cases in mind except Hive bucketed table? > >>>>>> > >>>>>> > >>>>>> > >>>>>> Just curious if there’s any other use cases we are targeting as part > >>>>>> of SPIP. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Cheng Su > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> *From: *Ryan Blue <b...@apache.org> > >>>>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM > >>>>>> *To: *John Zhuge <jzh...@apache.org> > >>>>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>, > >>>>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon > >>>>>> Hyun <dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, > >>>>>> Wenchen Fan <wenc...@databricks.com>, angers zhu < > >>>>>> angers....@gmail.com>, dev <dev@spark.apache.org>, huaxin gao < > >>>>>> huaxin.ga...@gmail.com> > >>>>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data > >>>>>> Source V2 > >>>>>> > >>>>>> Instead of commenting on the doc, could we keep discussion here on > >>>>>> the dev list please? That way more people can follow it and there is > >>>>>> more > >>>>>> room for discussion. Comment threads have a very small area and easily > >>>>>> become hard to follow. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Ryan > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote: > >>>>>> > >>>>>> +1 Nicely done! > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote: > >>>>>> > >>>>>> Oops, sorry. I just fixed the permission setting. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Thanks everyone for the positive support! > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> +1 to this SPIP and nice writeup of the design doc! > >>>>>> > >>>>>> > >>>>>> > >>>>>> Can we open comment permission in the doc so that we can discuss > >>>>>> details there? > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> Seems making sense to me. > >>>>>> > >>>>>> Would be great to have some feedback from people such as @Wenchen Fan > >>>>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu > >>>>>> <angers....@gmail.com>. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> +1 for this SPIP. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> +1. Thanks for lifting the current restrictions on bucket join and > >>>>>> making this more generalized. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote: > >>>>>> > >>>>>> +1 from me as well. Thanks Chao for doing so much to get it to this > >>>>>> point! > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote: > >>>>>> > >>>>>> +1 on this SPIP. > >>>>>> > >>>>>> This is a more generalized version of bucketed tables and bucketed > >>>>>> joins which can eliminate very expensive data shuffles when joins, and > >>>>>> many users in the Apache Spark community have wanted this feature for > >>>>>> a long time! > >>>>>> > >>>>>> Thank you, Ryan and Chao, for working on this, and I look forward to > >>>>>> it as a new feature in Spark 3.3 > >>>>>> > >>>>>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 > >>>>>> > >>>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote: > >>>>>> > > >>>>>> > Hi, > >>>>>> > > >>>>>> > Ryan and I drafted a design doc to support a new type of join: > >>>>>> storage partitioned join which covers bucket join support for > >>>>>> DataSourceV2 > >>>>>> but is more general. The goal is to let Spark leverage distribution > >>>>>> properties reported by data sources and eliminate shuffle whenever > >>>>>> possible. > >>>>>> > > >>>>>> > Design doc: > >>>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE > >>>>>> (includes a POC link at the end) > >>>>>> > > >>>>>> > We'd like to start a discussion on the doc and any feedback is > >>>>>> welcome! > >>>>>> > > >>>>>> > Thanks, > >>>>>> > Chao > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> Ryan Blue > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> John Zhuge > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> > >>>>>> Ryan Blue > >>>>>> > >>>>> > >>> > >>> -- > >>> Ryan Blue > >>> > >> > > > > -- > > Ryan Blue > > > --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org