IIUC, the general idea is to let each input split report its partition
value, and Spark can perform the join in two phases:
1. join the input splits from left and right tables according to their
partitions values and join keys, at the driver side.
2. for each joined input splits pair (or a group of splits), launch a Spark
task to join the rows.

My major concern is about how to define "compatible partitions". Things
like `days(ts)` are straightforward: the same timestamp value always
results in the same partition value, in whatever v2 sources. `bucket(col,
num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
sources may return different bucket IDs for the same value, and this breaks
the phase 1 split-wise join.

And two questions for further improvements:
1. Can we apply this idea to partitioned file source tables (non-bucketed)
as well?
2. What if the table has many partitions? Shall we apply certain join
algorithms in the phase 1 split-wise join as well? Or even launch a Spark
job to do so?

Thanks,
Wenchen

On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote:

> Thanks Cheng for the comments.
>
> > Is migrating Hive table read path to data source v2, being a
> prerequisite of this SPIP
>
> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
> Hive eventually moves to use V2 API. With that said, I think some of the
> ideas could be useful for V1 Hive support as well. For instance, with the
> newly proposed logic to compare whether output partitionings from both
> sides of a join operator are compatible, we can have HiveTableScanExec to
> report a different partitioning other than HashPartitioning, and
> EnsureRequirements could potentially recognize that and therefore avoid
> shuffle if both sides report the same compatible partitioning. In addition,
> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
> the constraint for V1 bucket join so that the join keys do not necessarily
> be identical to the bucket keys.
>
> > Would aggregate work automatically after the SPIP?
>
> Yes it will work as before. This case is already supported by
> DataSourcePartitioning in V2 (see SPARK-22389).
>
> > Any major use cases in mind except Hive bucketed table?
>
> Our first use case is Apache Iceberg. In addition to that we also want to
> add the support for Spark's built-in file data sources.
>
> Thanks,
> Chao
>
> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote:
>
>> +1 for this. This is exciting movement to efficiently read bucketed table
>> from other systems (Hive, Trino & Presto)!
>>
>>
>>
>> Still looking at the details but having some early questions:
>>
>>
>>
>>    1. Is migrating Hive table read path to data source v2, being a
>>    prerequisite of this SPIP?
>>
>>
>>
>> Hive table read path is currently a mix of data source v1 (for Parquet &
>> ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>> the SPIP, I am seeing we only make change for data source v2, so wondering
>> how this would work with existing Hive table read path. In addition, just
>> FYI, supporting writing Hive bucketed table is merged in master recently (
>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has
>> details).
>>
>>
>>
>>    1. Would aggregate work automatically after the SPIP?
>>
>>
>>
>> Another major benefit for having bucketed table, is to avoid shuffle
>> before aggregate. Just want to bring to our attention that it would be
>> great to consider aggregate as well when doing this proposal.
>>
>>
>>
>>    1. Any major use cases in mind except Hive bucketed table?
>>
>>
>>
>> Just curious if there’s any other use cases we are targeting as part of
>> SPIP.
>>
>>
>>
>> Thanks,
>>
>> Cheng Su
>>
>>
>>
>>
>>
>>
>>
>> *From: *Ryan Blue <b...@apache.org>
>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
>> *To: *John Zhuge <jzh...@apache.org>
>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>,
>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun <
>> dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, Wenchen
>> Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, dev <
>> dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com>
>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source
>> V2
>>
>> Instead of commenting on the doc, could we keep discussion here on the
>> dev list please? That way more people can follow it and there is more room
>> for discussion. Comment threads have a very small area and easily become
>> hard to follow.
>>
>>
>>
>> Ryan
>>
>>
>>
>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote:
>>
>> +1  Nicely done!
>>
>>
>>
>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote:
>>
>> Oops, sorry. I just fixed the permission setting.
>>
>>
>>
>> Thanks everyone for the positive support!
>>
>>
>>
>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>> +1 to this SPIP and nice writeup of the design doc!
>>
>>
>>
>> Can we open comment permission in the doc so that we can discuss details
>> there?
>>
>>
>>
>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com> wrote:
>>
>> Seems making sense to me.
>>
>> Would be great to have some feedback from people such as @Wenchen Fan
>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu
>> <angers....@gmail.com>.
>>
>>
>>
>>
>>
>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com>
>> wrote:
>>
>> +1 for this SPIP.
>>
>>
>>
>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com>
>> wrote:
>>
>> +1. Thanks for lifting the current restrictions on bucket join and making
>> this more generalized.
>>
>>
>>
>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote:
>>
>> +1 from me as well. Thanks Chao for doing so much to get it to this point!
>>
>>
>>
>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote:
>>
>> +1 on this SPIP.
>>
>> This is a more generalized version of bucketed tables and bucketed
>> joins which can eliminate very expensive data shuffles when joins, and
>> many users in the Apache Spark community have wanted this feature for
>> a long time!
>>
>> Thank you, Ryan and Chao, for working on this, and I look forward to
>> it as a new feature in Spark 3.3
>>
>> DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
>>
>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote:
>> >
>> > Hi,
>> >
>> > Ryan and I drafted a design doc to support a new type of join: storage
>> partitioned join which covers bucket join support for DataSourceV2 but is
>> more general. The goal is to let Spark leverage distribution properties
>> reported by data sources and eliminate shuffle whenever possible.
>> >
>> > Design doc:
>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
>> (includes a POC link at the end)
>> >
>> > We'd like to start a discussion on the doc and any feedback is welcome!
>> >
>> > Thanks,
>> > Chao
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>>
>>
>>
>> --
>>
>> John Zhuge
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>

Reply via email to