IIUC, the general idea is to let each input split report its partition value, and Spark can perform the join in two phases: 1. join the input splits from left and right tables according to their partitions values and join keys, at the driver side. 2. for each joined input splits pair (or a group of splits), launch a Spark task to join the rows.
My major concern is about how to define "compatible partitions". Things like `days(ts)` are straightforward: the same timestamp value always results in the same partition value, in whatever v2 sources. `bucket(col, num)` is tricky, as Spark doesn't define the bucket hash function. Two v2 sources may return different bucket IDs for the same value, and this breaks the phase 1 split-wise join. And two questions for further improvements: 1. Can we apply this idea to partitioned file source tables (non-bucketed) as well? 2. What if the table has many partitions? Shall we apply certain join algorithms in the phase 1 split-wise join as well? Or even launch a Spark job to do so? Thanks, Wenchen On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote: > Thanks Cheng for the comments. > > > Is migrating Hive table read path to data source v2, being a > prerequisite of this SPIP > > Yes, this SPIP only aims at DataSourceV2, so obviously it will help if > Hive eventually moves to use V2 API. With that said, I think some of the > ideas could be useful for V1 Hive support as well. For instance, with the > newly proposed logic to compare whether output partitionings from both > sides of a join operator are compatible, we can have HiveTableScanExec to > report a different partitioning other than HashPartitioning, and > EnsureRequirements could potentially recognize that and therefore avoid > shuffle if both sides report the same compatible partitioning. In addition, > SPARK-35703, which is part of the SPIP, is also useful in that it relaxes > the constraint for V1 bucket join so that the join keys do not necessarily > be identical to the bucket keys. > > > Would aggregate work automatically after the SPIP? > > Yes it will work as before. This case is already supported by > DataSourcePartitioning in V2 (see SPARK-22389). > > > Any major use cases in mind except Hive bucketed table? > > Our first use case is Apache Iceberg. In addition to that we also want to > add the support for Spark's built-in file data sources. > > Thanks, > Chao > > On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote: > >> +1 for this. This is exciting movement to efficiently read bucketed table >> from other systems (Hive, Trino & Presto)! >> >> >> >> Still looking at the details but having some early questions: >> >> >> >> 1. Is migrating Hive table read path to data source v2, being a >> prerequisite of this SPIP? >> >> >> >> Hive table read path is currently a mix of data source v1 (for Parquet & >> ORC file format only), and legacy Hive code path (HiveTableScanExec). In >> the SPIP, I am seeing we only make change for data source v2, so wondering >> how this would work with existing Hive table read path. In addition, just >> FYI, supporting writing Hive bucketed table is merged in master recently ( >> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has >> details). >> >> >> >> 1. Would aggregate work automatically after the SPIP? >> >> >> >> Another major benefit for having bucketed table, is to avoid shuffle >> before aggregate. Just want to bring to our attention that it would be >> great to consider aggregate as well when doing this proposal. >> >> >> >> 1. Any major use cases in mind except Hive bucketed table? >> >> >> >> Just curious if there’s any other use cases we are targeting as part of >> SPIP. >> >> >> >> Thanks, >> >> Cheng Su >> >> >> >> >> >> >> >> *From: *Ryan Blue <b...@apache.org> >> *Date: *Tuesday, October 26, 2021 at 9:39 AM >> *To: *John Zhuge <jzh...@apache.org> >> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>, >> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun < >> dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, Wenchen >> Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, dev < >> dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com> >> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source >> V2 >> >> Instead of commenting on the doc, could we keep discussion here on the >> dev list please? That way more people can follow it and there is more room >> for discussion. Comment threads have a very small area and easily become >> hard to follow. >> >> >> >> Ryan >> >> >> >> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote: >> >> +1 Nicely done! >> >> >> >> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote: >> >> Oops, sorry. I just fixed the permission setting. >> >> >> >> Thanks everyone for the positive support! >> >> >> >> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> wrote: >> >> +1 to this SPIP and nice writeup of the design doc! >> >> >> >> Can we open comment permission in the doc so that we can discuss details >> there? >> >> >> >> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com> wrote: >> >> Seems making sense to me. >> >> Would be great to have some feedback from people such as @Wenchen Fan >> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu >> <angers....@gmail.com>. >> >> >> >> >> >> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com> >> wrote: >> >> +1 for this SPIP. >> >> >> >> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com> >> wrote: >> >> +1. Thanks for lifting the current restrictions on bucket join and making >> this more generalized. >> >> >> >> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote: >> >> +1 from me as well. Thanks Chao for doing so much to get it to this point! >> >> >> >> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote: >> >> +1 on this SPIP. >> >> This is a more generalized version of bucketed tables and bucketed >> joins which can eliminate very expensive data shuffles when joins, and >> many users in the Apache Spark community have wanted this feature for >> a long time! >> >> Thank you, Ryan and Chao, for working on this, and I look forward to >> it as a new feature in Spark 3.3 >> >> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >> >> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote: >> > >> > Hi, >> > >> > Ryan and I drafted a design doc to support a new type of join: storage >> partitioned join which covers bucket join support for DataSourceV2 but is >> more general. The goal is to let Spark leverage distribution properties >> reported by data sources and eliminate shuffle whenever possible. >> > >> > Design doc: >> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >> (includes a POC link at the end) >> > >> > We'd like to start a discussion on the doc and any feedback is welcome! >> > >> > Thanks, >> > Chao >> >> >> >> >> -- >> >> Ryan Blue >> >> >> >> >> -- >> >> John Zhuge >> >> >> >> >> -- >> >> Ryan Blue >> >