`BucketTransform` is a builtin partition transform in Spark, instead of a
UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
represent its bucket transform, or use the Spark builtin `BucketTransform`?
I'm asking this because other v2 sources may also use the builtin
`BucketTransform` but use a different bucket hash function. Or we can
clearly define the bucket hash function of the builtin `BucketTransform` in
the doc.

On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote:

> Two v2 sources may return different bucket IDs for the same value, and
> this breaks the phase 1 split-wise join.
>
> This is why the FunctionCatalog included a canonicalName method (docs
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>).
> That method returns an identifier that can be used to compare whether two
> bucket function instances are the same.
>
>
>    1. Can we apply this idea to partitioned file source tables
>    (non-bucketed) as well?
>
> What do you mean here? The design doc discusses transforms like days(ts)
> that can be supported in the future. Is that what you’re asking about? Or
> are you referring to v1 file sources? I think the goal is to support v2,
> since v1 doesn’t have reliable behavior.
>
> Note that the initial implementation goal is to support bucketing since
> that’s an easier case because both sides have the same number of
> partitions. More complex storage-partitioned joins can be implemented later.
>
>
>    1. What if the table has many partitions? Shall we apply certain join
>    algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>    job to do so?
>
> I think that this proposal opens up a lot of possibilities, like what
> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
> heuristics for choosing how and when to use storage partitioning in joins.
> As I said above, bucketing is a great way to get started because it fills
> an existing gap. More complex use cases can be supported over time.
>
> Ryan
>
> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> IIUC, the general idea is to let each input split report its partition
>> value, and Spark can perform the join in two phases:
>> 1. join the input splits from left and right tables according to their
>> partitions values and join keys, at the driver side.
>> 2. for each joined input splits pair (or a group of splits), launch a
>> Spark task to join the rows.
>>
>> My major concern is about how to define "compatible partitions". Things
>> like `days(ts)` are straightforward: the same timestamp value always
>> results in the same partition value, in whatever v2 sources. `bucket(col,
>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
>> sources may return different bucket IDs for the same value, and this breaks
>> the phase 1 split-wise join.
>>
>> And two questions for further improvements:
>> 1. Can we apply this idea to partitioned file source tables
>> (non-bucketed) as well?
>> 2. What if the table has many partitions? Shall we apply certain join
>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>> job to do so?
>>
>> Thanks,
>> Wenchen
>>
>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote:
>>
>>> Thanks Cheng for the comments.
>>>
>>> > Is migrating Hive table read path to data source v2, being a
>>> prerequisite of this SPIP
>>>
>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
>>> Hive eventually moves to use V2 API. With that said, I think some of the
>>> ideas could be useful for V1 Hive support as well. For instance, with the
>>> newly proposed logic to compare whether output partitionings from both
>>> sides of a join operator are compatible, we can have HiveTableScanExec to
>>> report a different partitioning other than HashPartitioning, and
>>> EnsureRequirements could potentially recognize that and therefore avoid
>>> shuffle if both sides report the same compatible partitioning. In addition,
>>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
>>> the constraint for V1 bucket join so that the join keys do not necessarily
>>> be identical to the bucket keys.
>>>
>>> > Would aggregate work automatically after the SPIP?
>>>
>>> Yes it will work as before. This case is already supported by
>>> DataSourcePartitioning in V2 (see SPARK-22389).
>>>
>>> > Any major use cases in mind except Hive bucketed table?
>>>
>>> Our first use case is Apache Iceberg. In addition to that we also want
>>> to add the support for Spark's built-in file data sources.
>>>
>>> Thanks,
>>> Chao
>>>
>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote:
>>>
>>>> +1 for this. This is exciting movement to efficiently read bucketed
>>>> table from other systems (Hive, Trino & Presto)!
>>>>
>>>>
>>>>
>>>> Still looking at the details but having some early questions:
>>>>
>>>>
>>>>
>>>>    1. Is migrating Hive table read path to data source v2, being a
>>>>    prerequisite of this SPIP?
>>>>
>>>>
>>>>
>>>> Hive table read path is currently a mix of data source v1 (for Parquet
>>>> & ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>>>> the SPIP, I am seeing we only make change for data source v2, so wondering
>>>> how this would work with existing Hive table read path. In addition, just
>>>> FYI, supporting writing Hive bucketed table is merged in master recently (
>>>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has
>>>> details).
>>>>
>>>>
>>>>
>>>>    1. Would aggregate work automatically after the SPIP?
>>>>
>>>>
>>>>
>>>> Another major benefit for having bucketed table, is to avoid shuffle
>>>> before aggregate. Just want to bring to our attention that it would be
>>>> great to consider aggregate as well when doing this proposal.
>>>>
>>>>
>>>>
>>>>    1. Any major use cases in mind except Hive bucketed table?
>>>>
>>>>
>>>>
>>>> Just curious if there’s any other use cases we are targeting as part of
>>>> SPIP.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Cheng Su
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From: *Ryan Blue <b...@apache.org>
>>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
>>>> *To: *John Zhuge <jzh...@apache.org>
>>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>,
>>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun <
>>>> dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, Wenchen
>>>> Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, dev <
>>>> dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com>
>>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data
>>>> Source V2
>>>>
>>>> Instead of commenting on the doc, could we keep discussion here on the
>>>> dev list please? That way more people can follow it and there is more room
>>>> for discussion. Comment threads have a very small area and easily become
>>>> hard to follow.
>>>>
>>>>
>>>>
>>>> Ryan
>>>>
>>>>
>>>>
>>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote:
>>>>
>>>> +1  Nicely done!
>>>>
>>>>
>>>>
>>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote:
>>>>
>>>> Oops, sorry. I just fixed the permission setting.
>>>>
>>>>
>>>>
>>>> Thanks everyone for the positive support!
>>>>
>>>>
>>>>
>>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com>
>>>> wrote:
>>>>
>>>> +1 to this SPIP and nice writeup of the design doc!
>>>>
>>>>
>>>>
>>>> Can we open comment permission in the doc so that we can discuss
>>>> details there?
>>>>
>>>>
>>>>
>>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com>
>>>> wrote:
>>>>
>>>> Seems making sense to me.
>>>>
>>>> Would be great to have some feedback from people such as @Wenchen Fan
>>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu
>>>> <angers....@gmail.com>.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com>
>>>> wrote:
>>>>
>>>> +1 for this SPIP.
>>>>
>>>>
>>>>
>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com>
>>>> wrote:
>>>>
>>>> +1. Thanks for lifting the current restrictions on bucket join and
>>>> making this more generalized.
>>>>
>>>>
>>>>
>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote:
>>>>
>>>> +1 from me as well. Thanks Chao for doing so much to get it to this
>>>> point!
>>>>
>>>>
>>>>
>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote:
>>>>
>>>> +1 on this SPIP.
>>>>
>>>> This is a more generalized version of bucketed tables and bucketed
>>>> joins which can eliminate very expensive data shuffles when joins, and
>>>> many users in the Apache Spark community have wanted this feature for
>>>> a long time!
>>>>
>>>> Thank you, Ryan and Chao, for working on this, and I look forward to
>>>> it as a new feature in Spark 3.3
>>>>
>>>> DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
>>>>
>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Ryan and I drafted a design doc to support a new type of join:
>>>> storage partitioned join which covers bucket join support for DataSourceV2
>>>> but is more general. The goal is to let Spark leverage distribution
>>>> properties reported by data sources and eliminate shuffle whenever 
>>>> possible.
>>>> >
>>>> > Design doc:
>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
>>>> (includes a POC link at the end)
>>>> >
>>>> > We'd like to start a discussion on the doc and any feedback is
>>>> welcome!
>>>> >
>>>> > Thanks,
>>>> > Chao
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> John Zhuge
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>
>
> --
> Ryan Blue
>

Reply via email to