The transform expressions in v2 are logical, not concrete implementations. Even days may have different implementations -- the only expectation is that the partitions are day-sized. For example, you could use a transform that splits days at UTC 00:00, or uses some other day boundary.
Because the expressions are logical, we need to resolve them to implementations at some point, like Chao outlines. We can do that using a FunctionCatalog, although I think it's worth considering adding an interface so that a transform from a Table can be converted into a `BoundFunction` directly. That is easier than defining a way for Spark to query the function catalog. In any case, I'm sure it's easy to understand how this works once you get a concrete implementation. On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan <cloud0...@gmail.com> wrote: > `BucketTransform` is a builtin partition transform in Spark, instead of a > UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to > represent its bucket transform, or use the Spark builtin `BucketTransform`? > I'm asking this because other v2 sources may also use the builtin > `BucketTransform` but use a different bucket hash function. Or we can > clearly define the bucket hash function of the builtin `BucketTransform` in > the doc. > > On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue <b...@apache.org> wrote: > >> Two v2 sources may return different bucket IDs for the same value, and >> this breaks the phase 1 split-wise join. >> >> This is why the FunctionCatalog included a canonicalName method (docs >> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>). >> That method returns an identifier that can be used to compare whether two >> bucket function instances are the same. >> >> >> 1. Can we apply this idea to partitioned file source tables >> (non-bucketed) as well? >> >> What do you mean here? The design doc discusses transforms like days(ts) >> that can be supported in the future. Is that what you’re asking about? Or >> are you referring to v1 file sources? I think the goal is to support v2, >> since v1 doesn’t have reliable behavior. >> >> Note that the initial implementation goal is to support bucketing since >> that’s an easier case because both sides have the same number of >> partitions. More complex storage-partitioned joins can be implemented later. >> >> >> 1. What if the table has many partitions? Shall we apply certain join >> algorithms in the phase 1 split-wise join as well? Or even launch a Spark >> job to do so? >> >> I think that this proposal opens up a lot of possibilities, like what >> you’re suggesting here. It is a bit like AQE. We’ll need to come up with >> heuristics for choosing how and when to use storage partitioning in joins. >> As I said above, bucketing is a great way to get started because it fills >> an existing gap. More complex use cases can be supported over time. >> >> Ryan >> >> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> IIUC, the general idea is to let each input split report its partition >>> value, and Spark can perform the join in two phases: >>> 1. join the input splits from left and right tables according to their >>> partitions values and join keys, at the driver side. >>> 2. for each joined input splits pair (or a group of splits), launch a >>> Spark task to join the rows. >>> >>> My major concern is about how to define "compatible partitions". Things >>> like `days(ts)` are straightforward: the same timestamp value always >>> results in the same partition value, in whatever v2 sources. `bucket(col, >>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2 >>> sources may return different bucket IDs for the same value, and this breaks >>> the phase 1 split-wise join. >>> >>> And two questions for further improvements: >>> 1. Can we apply this idea to partitioned file source tables >>> (non-bucketed) as well? >>> 2. What if the table has many partitions? Shall we apply certain join >>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark >>> job to do so? >>> >>> Thanks, >>> Wenchen >>> >>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote: >>> >>>> Thanks Cheng for the comments. >>>> >>>> > Is migrating Hive table read path to data source v2, being a >>>> prerequisite of this SPIP >>>> >>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if >>>> Hive eventually moves to use V2 API. With that said, I think some of the >>>> ideas could be useful for V1 Hive support as well. For instance, with the >>>> newly proposed logic to compare whether output partitionings from both >>>> sides of a join operator are compatible, we can have HiveTableScanExec to >>>> report a different partitioning other than HashPartitioning, and >>>> EnsureRequirements could potentially recognize that and therefore avoid >>>> shuffle if both sides report the same compatible partitioning. In addition, >>>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes >>>> the constraint for V1 bucket join so that the join keys do not necessarily >>>> be identical to the bucket keys. >>>> >>>> > Would aggregate work automatically after the SPIP? >>>> >>>> Yes it will work as before. This case is already supported by >>>> DataSourcePartitioning in V2 (see SPARK-22389). >>>> >>>> > Any major use cases in mind except Hive bucketed table? >>>> >>>> Our first use case is Apache Iceberg. In addition to that we also want >>>> to add the support for Spark's built-in file data sources. >>>> >>>> Thanks, >>>> Chao >>>> >>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote: >>>> >>>>> +1 for this. This is exciting movement to efficiently read bucketed >>>>> table from other systems (Hive, Trino & Presto)! >>>>> >>>>> >>>>> >>>>> Still looking at the details but having some early questions: >>>>> >>>>> >>>>> >>>>> 1. Is migrating Hive table read path to data source v2, being a >>>>> prerequisite of this SPIP? >>>>> >>>>> >>>>> >>>>> Hive table read path is currently a mix of data source v1 (for Parquet >>>>> & ORC file format only), and legacy Hive code path (HiveTableScanExec). In >>>>> the SPIP, I am seeing we only make change for data source v2, so wondering >>>>> how this would work with existing Hive table read path. In addition, just >>>>> FYI, supporting writing Hive bucketed table is merged in master recently ( >>>>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has >>>>> details). >>>>> >>>>> >>>>> >>>>> 1. Would aggregate work automatically after the SPIP? >>>>> >>>>> >>>>> >>>>> Another major benefit for having bucketed table, is to avoid shuffle >>>>> before aggregate. Just want to bring to our attention that it would be >>>>> great to consider aggregate as well when doing this proposal. >>>>> >>>>> >>>>> >>>>> 1. Any major use cases in mind except Hive bucketed table? >>>>> >>>>> >>>>> >>>>> Just curious if there’s any other use cases we are targeting as part >>>>> of SPIP. >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Cheng Su >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *From: *Ryan Blue <b...@apache.org> >>>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM >>>>> *To: *John Zhuge <jzh...@apache.org> >>>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>, >>>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun >>>>> <dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, >>>>> Wenchen Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, >>>>> dev <dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com> >>>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data >>>>> Source V2 >>>>> >>>>> Instead of commenting on the doc, could we keep discussion here on the >>>>> dev list please? That way more people can follow it and there is more room >>>>> for discussion. Comment threads have a very small area and easily become >>>>> hard to follow. >>>>> >>>>> >>>>> >>>>> Ryan >>>>> >>>>> >>>>> >>>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote: >>>>> >>>>> +1 Nicely done! >>>>> >>>>> >>>>> >>>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote: >>>>> >>>>> Oops, sorry. I just fixed the permission setting. >>>>> >>>>> >>>>> >>>>> Thanks everyone for the positive support! >>>>> >>>>> >>>>> >>>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> >>>>> wrote: >>>>> >>>>> +1 to this SPIP and nice writeup of the design doc! >>>>> >>>>> >>>>> >>>>> Can we open comment permission in the doc so that we can discuss >>>>> details there? >>>>> >>>>> >>>>> >>>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com> >>>>> wrote: >>>>> >>>>> Seems making sense to me. >>>>> >>>>> Would be great to have some feedback from people such as @Wenchen Fan >>>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu >>>>> <angers....@gmail.com>. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com> >>>>> wrote: >>>>> >>>>> +1 for this SPIP. >>>>> >>>>> >>>>> >>>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com> >>>>> wrote: >>>>> >>>>> +1. Thanks for lifting the current restrictions on bucket join and >>>>> making this more generalized. >>>>> >>>>> >>>>> >>>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote: >>>>> >>>>> +1 from me as well. Thanks Chao for doing so much to get it to this >>>>> point! >>>>> >>>>> >>>>> >>>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote: >>>>> >>>>> +1 on this SPIP. >>>>> >>>>> This is a more generalized version of bucketed tables and bucketed >>>>> joins which can eliminate very expensive data shuffles when joins, and >>>>> many users in the Apache Spark community have wanted this feature for >>>>> a long time! >>>>> >>>>> Thank you, Ryan and Chao, for working on this, and I look forward to >>>>> it as a new feature in Spark 3.3 >>>>> >>>>> DB Tsai | https://www.dbtsai.com/ | PGP 42E5B25A8F7A82C1 >>>>> >>>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote: >>>>> > >>>>> > Hi, >>>>> > >>>>> > Ryan and I drafted a design doc to support a new type of join: >>>>> storage partitioned join which covers bucket join support for DataSourceV2 >>>>> but is more general. The goal is to let Spark leverage distribution >>>>> properties reported by data sources and eliminate shuffle whenever >>>>> possible. >>>>> > >>>>> > Design doc: >>>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE >>>>> (includes a POC link at the end) >>>>> > >>>>> > We'd like to start a discussion on the doc and any feedback is >>>>> welcome! >>>>> > >>>>> > Thanks, >>>>> > Chao >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Ryan Blue >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> John Zhuge >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Ryan Blue >>>>> >>>> >> >> -- >> Ryan Blue >> > -- Ryan Blue