The transform expressions in v2 are logical, not concrete implementations.
Even days may have different implementations -- the only expectation is
that the partitions are day-sized. For example, you could use a transform
that splits days at UTC 00:00, or uses some other day boundary.

Because the expressions are logical, we need to resolve them to
implementations at some point, like Chao outlines. We can do that using a
FunctionCatalog, although I think it's worth considering adding an
interface so that a transform from a Table can be converted into a
`BoundFunction` directly. That is easier than defining a way for Spark to
query the function catalog.

In any case, I'm sure it's easy to understand how this works once you get a
concrete implementation.

On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan <cloud0...@gmail.com> wrote:

> `BucketTransform` is a builtin partition transform in Spark, instead of a
> UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
> represent its bucket transform, or use the Spark builtin `BucketTransform`?
> I'm asking this because other v2 sources may also use the builtin
> `BucketTransform` but use a different bucket hash function. Or we can
> clearly define the bucket hash function of the builtin `BucketTransform` in
> the doc.
>
> On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote:
>
>> Two v2 sources may return different bucket IDs for the same value, and
>> this breaks the phase 1 split-wise join.
>>
>> This is why the FunctionCatalog included a canonicalName method (docs
>> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>).
>> That method returns an identifier that can be used to compare whether two
>> bucket function instances are the same.
>>
>>
>>    1. Can we apply this idea to partitioned file source tables
>>    (non-bucketed) as well?
>>
>> What do you mean here? The design doc discusses transforms like days(ts)
>> that can be supported in the future. Is that what you’re asking about? Or
>> are you referring to v1 file sources? I think the goal is to support v2,
>> since v1 doesn’t have reliable behavior.
>>
>> Note that the initial implementation goal is to support bucketing since
>> that’s an easier case because both sides have the same number of
>> partitions. More complex storage-partitioned joins can be implemented later.
>>
>>
>>    1. What if the table has many partitions? Shall we apply certain join
>>    algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>>    job to do so?
>>
>> I think that this proposal opens up a lot of possibilities, like what
>> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
>> heuristics for choosing how and when to use storage partitioning in joins.
>> As I said above, bucketing is a great way to get started because it fills
>> an existing gap. More complex use cases can be supported over time.
>>
>> Ryan
>>
>> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>>> IIUC, the general idea is to let each input split report its partition
>>> value, and Spark can perform the join in two phases:
>>> 1. join the input splits from left and right tables according to their
>>> partitions values and join keys, at the driver side.
>>> 2. for each joined input splits pair (or a group of splits), launch a
>>> Spark task to join the rows.
>>>
>>> My major concern is about how to define "compatible partitions". Things
>>> like `days(ts)` are straightforward: the same timestamp value always
>>> results in the same partition value, in whatever v2 sources. `bucket(col,
>>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
>>> sources may return different bucket IDs for the same value, and this breaks
>>> the phase 1 split-wise join.
>>>
>>> And two questions for further improvements:
>>> 1. Can we apply this idea to partitioned file source tables
>>> (non-bucketed) as well?
>>> 2. What if the table has many partitions? Shall we apply certain join
>>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>>> job to do so?
>>>
>>> Thanks,
>>> Wenchen
>>>
>>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote:
>>>
>>>> Thanks Cheng for the comments.
>>>>
>>>> > Is migrating Hive table read path to data source v2, being a
>>>> prerequisite of this SPIP
>>>>
>>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
>>>> Hive eventually moves to use V2 API. With that said, I think some of the
>>>> ideas could be useful for V1 Hive support as well. For instance, with the
>>>> newly proposed logic to compare whether output partitionings from both
>>>> sides of a join operator are compatible, we can have HiveTableScanExec to
>>>> report a different partitioning other than HashPartitioning, and
>>>> EnsureRequirements could potentially recognize that and therefore avoid
>>>> shuffle if both sides report the same compatible partitioning. In addition,
>>>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
>>>> the constraint for V1 bucket join so that the join keys do not necessarily
>>>> be identical to the bucket keys.
>>>>
>>>> > Would aggregate work automatically after the SPIP?
>>>>
>>>> Yes it will work as before. This case is already supported by
>>>> DataSourcePartitioning in V2 (see SPARK-22389).
>>>>
>>>> > Any major use cases in mind except Hive bucketed table?
>>>>
>>>> Our first use case is Apache Iceberg. In addition to that we also want
>>>> to add the support for Spark's built-in file data sources.
>>>>
>>>> Thanks,
>>>> Chao
>>>>
>>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote:
>>>>
>>>>> +1 for this. This is exciting movement to efficiently read bucketed
>>>>> table from other systems (Hive, Trino & Presto)!
>>>>>
>>>>>
>>>>>
>>>>> Still looking at the details but having some early questions:
>>>>>
>>>>>
>>>>>
>>>>>    1. Is migrating Hive table read path to data source v2, being a
>>>>>    prerequisite of this SPIP?
>>>>>
>>>>>
>>>>>
>>>>> Hive table read path is currently a mix of data source v1 (for Parquet
>>>>> & ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>>>>> the SPIP, I am seeing we only make change for data source v2, so wondering
>>>>> how this would work with existing Hive table read path. In addition, just
>>>>> FYI, supporting writing Hive bucketed table is merged in master recently (
>>>>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has
>>>>> details).
>>>>>
>>>>>
>>>>>
>>>>>    1. Would aggregate work automatically after the SPIP?
>>>>>
>>>>>
>>>>>
>>>>> Another major benefit for having bucketed table, is to avoid shuffle
>>>>> before aggregate. Just want to bring to our attention that it would be
>>>>> great to consider aggregate as well when doing this proposal.
>>>>>
>>>>>
>>>>>
>>>>>    1. Any major use cases in mind except Hive bucketed table?
>>>>>
>>>>>
>>>>>
>>>>> Just curious if there’s any other use cases we are targeting as part
>>>>> of SPIP.
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Cheng Su
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *From: *Ryan Blue <b...@apache.org>
>>>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
>>>>> *To: *John Zhuge <jzh...@apache.org>
>>>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>,
>>>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun
>>>>> <dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>,
>>>>> Wenchen Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>,
>>>>> dev <dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com>
>>>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data
>>>>> Source V2
>>>>>
>>>>> Instead of commenting on the doc, could we keep discussion here on the
>>>>> dev list please? That way more people can follow it and there is more room
>>>>> for discussion. Comment threads have a very small area and easily become
>>>>> hard to follow.
>>>>>
>>>>>
>>>>>
>>>>> Ryan
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote:
>>>>>
>>>>> +1  Nicely done!
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote:
>>>>>
>>>>> Oops, sorry. I just fixed the permission setting.
>>>>>
>>>>>
>>>>>
>>>>> Thanks everyone for the positive support!
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> +1 to this SPIP and nice writeup of the design doc!
>>>>>
>>>>>
>>>>>
>>>>> Can we open comment permission in the doc so that we can discuss
>>>>> details there?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Seems making sense to me.
>>>>>
>>>>> Would be great to have some feedback from people such as @Wenchen Fan
>>>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu
>>>>> <angers....@gmail.com>.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> +1 for this SPIP.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> +1. Thanks for lifting the current restrictions on bucket join and
>>>>> making this more generalized.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote:
>>>>>
>>>>> +1 from me as well. Thanks Chao for doing so much to get it to this
>>>>> point!
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote:
>>>>>
>>>>> +1 on this SPIP.
>>>>>
>>>>> This is a more generalized version of bucketed tables and bucketed
>>>>> joins which can eliminate very expensive data shuffles when joins, and
>>>>> many users in the Apache Spark community have wanted this feature for
>>>>> a long time!
>>>>>
>>>>> Thank you, Ryan and Chao, for working on this, and I look forward to
>>>>> it as a new feature in Spark 3.3
>>>>>
>>>>> DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
>>>>>
>>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Ryan and I drafted a design doc to support a new type of join:
>>>>> storage partitioned join which covers bucket join support for DataSourceV2
>>>>> but is more general. The goal is to let Spark leverage distribution
>>>>> properties reported by data sources and eliminate shuffle whenever 
>>>>> possible.
>>>>> >
>>>>> > Design doc:
>>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
>>>>> (includes a POC link at the end)
>>>>> >
>>>>> > We'd like to start a discussion on the doc and any feedback is
>>>>> welcome!
>>>>> >
>>>>> > Thanks,
>>>>> > Chao
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ryan Blue
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> John Zhuge
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ryan Blue
>>>>>
>>>>
>>
>> --
>> Ryan Blue
>>
>

-- 
Ryan Blue

Reply via email to