Thanks all for your inputs here!

Seems the discussion already settles, I will be the shepherd for the SPIP and 
call for a vote on the SPIP moving forward in a new thread.

On 2021/10/28 13:05:53, Wenchen Fan <cloud0...@gmail.com> wrote: 
> Thanks for the explanation! It makes sense to always resolve the logical
> transforms to concrete implementations, and check the concrete
> implementations to decide compatible partitions. We can discuss more
> details in the PR later.
> 
> On Thu, Oct 28, 2021 at 4:14 AM Ryan Blue <b...@apache.org> wrote:
> 
> > The transform expressions in v2 are logical, not concrete implementations.
> > Even days may have different implementations -- the only expectation is
> > that the partitions are day-sized. For example, you could use a transform
> > that splits days at UTC 00:00, or uses some other day boundary.
> >
> > Because the expressions are logical, we need to resolve them to
> > implementations at some point, like Chao outlines. We can do that using a
> > FunctionCatalog, although I think it's worth considering adding an
> > interface so that a transform from a Table can be converted into a
> > `BoundFunction` directly. That is easier than defining a way for Spark to
> > query the function catalog.
> >
> > In any case, I'm sure it's easy to understand how this works once you get
> > a concrete implementation.
> >
> > On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan <cloud0...@gmail.com> wrote:
> >
> >> `BucketTransform` is a builtin partition transform in Spark, instead of a
> >> UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
> >> represent its bucket transform, or use the Spark builtin `BucketTransform`?
> >> I'm asking this because other v2 sources may also use the builtin
> >> `BucketTransform` but use a different bucket hash function. Or we can
> >> clearly define the bucket hash function of the builtin `BucketTransform` in
> >> the doc.
> >>
> >> On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote:
> >>
> >>> Two v2 sources may return different bucket IDs for the same value, and
> >>> this breaks the phase 1 split-wise join.
> >>>
> >>> This is why the FunctionCatalog included a canonicalName method (docs
> >>> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>).
> >>> That method returns an identifier that can be used to compare whether two
> >>> bucket function instances are the same.
> >>>
> >>>
> >>>    1. Can we apply this idea to partitioned file source tables
> >>>    (non-bucketed) as well?
> >>>
> >>> What do you mean here? The design doc discusses transforms like days(ts)
> >>> that can be supported in the future. Is that what you’re asking about? Or
> >>> are you referring to v1 file sources? I think the goal is to support v2,
> >>> since v1 doesn’t have reliable behavior.
> >>>
> >>> Note that the initial implementation goal is to support bucketing since
> >>> that’s an easier case because both sides have the same number of
> >>> partitions. More complex storage-partitioned joins can be implemented 
> >>> later.
> >>>
> >>>
> >>>    1. What if the table has many partitions? Shall we apply certain
> >>>    join algorithms in the phase 1 split-wise join as well? Or even launch 
> >>> a
> >>>    Spark job to do so?
> >>>
> >>> I think that this proposal opens up a lot of possibilities, like what
> >>> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
> >>> heuristics for choosing how and when to use storage partitioning in joins.
> >>> As I said above, bucketing is a great way to get started because it fills
> >>> an existing gap. More complex use cases can be supported over time.
> >>>
> >>> Ryan
> >>>
> >>> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote:
> >>>
> >>>> IIUC, the general idea is to let each input split report its partition
> >>>> value, and Spark can perform the join in two phases:
> >>>> 1. join the input splits from left and right tables according to their
> >>>> partitions values and join keys, at the driver side.
> >>>> 2. for each joined input splits pair (or a group of splits), launch a
> >>>> Spark task to join the rows.
> >>>>
> >>>> My major concern is about how to define "compatible partitions". Things
> >>>> like `days(ts)` are straightforward: the same timestamp value always
> >>>> results in the same partition value, in whatever v2 sources. `bucket(col,
> >>>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
> >>>> sources may return different bucket IDs for the same value, and this 
> >>>> breaks
> >>>> the phase 1 split-wise join.
> >>>>
> >>>> And two questions for further improvements:
> >>>> 1. Can we apply this idea to partitioned file source tables
> >>>> (non-bucketed) as well?
> >>>> 2. What if the table has many partitions? Shall we apply certain join
> >>>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
> >>>> job to do so?
> >>>>
> >>>> Thanks,
> >>>> Wenchen
> >>>>
> >>>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote:
> >>>>
> >>>>> Thanks Cheng for the comments.
> >>>>>
> >>>>> > Is migrating Hive table read path to data source v2, being a
> >>>>> prerequisite of this SPIP
> >>>>>
> >>>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
> >>>>> Hive eventually moves to use V2 API. With that said, I think some of the
> >>>>> ideas could be useful for V1 Hive support as well. For instance, with 
> >>>>> the
> >>>>> newly proposed logic to compare whether output partitionings from both
> >>>>> sides of a join operator are compatible, we can have HiveTableScanExec 
> >>>>> to
> >>>>> report a different partitioning other than HashPartitioning, and
> >>>>> EnsureRequirements could potentially recognize that and therefore avoid
> >>>>> shuffle if both sides report the same compatible partitioning. In 
> >>>>> addition,
> >>>>> SPARK-35703, which is part of the SPIP, is also useful in that it 
> >>>>> relaxes
> >>>>> the constraint for V1 bucket join so that the join keys do not 
> >>>>> necessarily
> >>>>> be identical to the bucket keys.
> >>>>>
> >>>>> > Would aggregate work automatically after the SPIP?
> >>>>>
> >>>>> Yes it will work as before. This case is already supported by
> >>>>> DataSourcePartitioning in V2 (see SPARK-22389).
> >>>>>
> >>>>> > Any major use cases in mind except Hive bucketed table?
> >>>>>
> >>>>> Our first use case is Apache Iceberg. In addition to that we also want
> >>>>> to add the support for Spark's built-in file data sources.
> >>>>>
> >>>>> Thanks,
> >>>>> Chao
> >>>>>
> >>>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote:
> >>>>>
> >>>>>> +1 for this. This is exciting movement to efficiently read bucketed
> >>>>>> table from other systems (Hive, Trino & Presto)!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Still looking at the details but having some early questions:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>    1. Is migrating Hive table read path to data source v2, being a
> >>>>>>    prerequisite of this SPIP?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Hive table read path is currently a mix of data source v1 (for
> >>>>>> Parquet & ORC file format only), and legacy Hive code path
> >>>>>> (HiveTableScanExec). In the SPIP, I am seeing we only make change for 
> >>>>>> data
> >>>>>> source v2, so wondering how this would work with existing Hive table 
> >>>>>> read
> >>>>>> path. In addition, just FYI, supporting writing Hive bucketed table is
> >>>>>> merged in master recently (SPARK-19256
> >>>>>> <https://issues.apache.org/jira/browse/SPARK-19256> has details).
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>    1. Would aggregate work automatically after the SPIP?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Another major benefit for having bucketed table, is to avoid shuffle
> >>>>>> before aggregate. Just want to bring to our attention that it would be
> >>>>>> great to consider aggregate as well when doing this proposal.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>    1. Any major use cases in mind except Hive bucketed table?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Just curious if there’s any other use cases we are targeting as part
> >>>>>> of SPIP.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Cheng Su
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> *From: *Ryan Blue <b...@apache.org>
> >>>>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
> >>>>>> *To: *John Zhuge <jzh...@apache.org>
> >>>>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>,
> >>>>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon
> >>>>>> Hyun <dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>,
> >>>>>> Wenchen Fan <wenc...@databricks.com>, angers zhu <
> >>>>>> angers....@gmail.com>, dev <dev@spark.apache.org>, huaxin gao <
> >>>>>> huaxin.ga...@gmail.com>
> >>>>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data
> >>>>>> Source V2
> >>>>>>
> >>>>>> Instead of commenting on the doc, could we keep discussion here on
> >>>>>> the dev list please? That way more people can follow it and there is 
> >>>>>> more
> >>>>>> room for discussion. Comment threads have a very small area and easily
> >>>>>> become hard to follow.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Ryan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote:
> >>>>>>
> >>>>>> +1  Nicely done!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote:
> >>>>>>
> >>>>>> Oops, sorry. I just fixed the permission setting.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Thanks everyone for the positive support!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> +1 to this SPIP and nice writeup of the design doc!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Can we open comment permission in the doc so that we can discuss
> >>>>>> details there?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> Seems making sense to me.
> >>>>>>
> >>>>>> Would be great to have some feedback from people such as @Wenchen Fan
> >>>>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu
> >>>>>> <angers....@gmail.com>.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> +1 for this SPIP.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> +1. Thanks for lifting the current restrictions on bucket join and
> >>>>>> making this more generalized.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote:
> >>>>>>
> >>>>>> +1 from me as well. Thanks Chao for doing so much to get it to this
> >>>>>> point!
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote:
> >>>>>>
> >>>>>> +1 on this SPIP.
> >>>>>>
> >>>>>> This is a more generalized version of bucketed tables and bucketed
> >>>>>> joins which can eliminate very expensive data shuffles when joins, and
> >>>>>> many users in the Apache Spark community have wanted this feature for
> >>>>>> a long time!
> >>>>>>
> >>>>>> Thank you, Ryan and Chao, for working on this, and I look forward to
> >>>>>> it as a new feature in Spark 3.3
> >>>>>>
> >>>>>> DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
> >>>>>>
> >>>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote:
> >>>>>> >
> >>>>>> > Hi,
> >>>>>> >
> >>>>>> > Ryan and I drafted a design doc to support a new type of join:
> >>>>>> storage partitioned join which covers bucket join support for 
> >>>>>> DataSourceV2
> >>>>>> but is more general. The goal is to let Spark leverage distribution
> >>>>>> properties reported by data sources and eliminate shuffle whenever 
> >>>>>> possible.
> >>>>>> >
> >>>>>> > Design doc:
> >>>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
> >>>>>> (includes a POC link at the end)
> >>>>>> >
> >>>>>> > We'd like to start a discussion on the doc and any feedback is
> >>>>>> welcome!
> >>>>>> >
> >>>>>> > Thanks,
> >>>>>> > Chao
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>>
> >>>>>> Ryan Blue
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>>
> >>>>>> John Zhuge
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>>
> >>>>>> Ryan Blue
> >>>>>>
> >>>>>
> >>>
> >>> --
> >>> Ryan Blue
> >>>
> >>
> >
> > --
> > Ryan Blue
> >
> 

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to