Hi Timo, Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY 6" or "BUCKETED INTO 6".
Not really used in SQL, but afaiu Spark uses the concept[1]. [1] https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html Best regards, Jing On Mon, Oct 30, 2023 at 5:25 PM Timo Walther <twal...@apache.org> wrote: > Hi Jing, > > > Have you considered using BUCKET BY directly? > > Which vendor uses this syntax? Most vendors that I checked call this > concept "distribution". > > In any case, the "BY" is optional, so certain DDL statements would > declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, > we should use the passive voice. > > > Did you mean users can use their own algorithm? How to do it? > > "own algorithm" only refers to deciding between a list of partitioning > strategies (namely hash and range partitioning) if the connector offers > more than one. > > Regards, > Timo > > > On 30.10.23 12:39, Jing Ge wrote: > > Hi Timo, > > > > The FLIP looks great! Thanks for bringing it to our attention! In order > to > > make sure we are on the same page, I would ask some questions: > > > > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > mentioned > > which is used to distribute rows amond reducers, i.e. focusing on the > > shuffle during the computation. The FLIP is focusing more on storage, if > I > > am not mistaken. Have you considered using BUCKET BY directly? > > > > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > STRING) > > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > > > > - For advanced users, the algorithm can be defined explicitly. > > - Currently, either HASH() or RANGE(). > > > > " > > Did you mean users can use their own algorithm? How to do it? > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org> > wrote: > > > >> Let me reply to the feedback from Yunfan: > >> > >> > Distribute by in DML is also supported by Hive > >> > >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > >> discussion is about DDL. For DDL, we have more freedom as every vendor > >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > >> connector to the connector implementation, not the engine. However, for > >> DML we need to watch out for standard compliance and introduce changes > >> with high caution. > >> > >> How a LookupTableSource interprets the DISTRIBUTED BY is > >> connector-dependent in my opinion. In general this FLIP is a sink > >> ability, but we could have a follow FLIP that helps in distributing load > >> of lookup joins. > >> > >> > to avoid data skew problem > >> > >> I understand the use case and that it is important to solve it > >> eventually. Maybe a solution might be to introduce helper Polymorphic > >> Table Functions [1] in the future instead of new syntax. > >> > >> [1] > >> > >> > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > >> > >> > >> Let me reply to the feedback from Benchao: > >> > >> > Do you think it's useful to add some extensibility for the hash > >> strategy > >> > >> The hash strategy is fully determined by the connector, not the Flink > >> SQL engine. We are not using Flink's hash strategy in any way. If the > >> hash strategy for the regular Flink file system connector should be > >> changed, this should be expressed via config option. Otherwise we should > >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > >> > >> Regards, > >> Timo > >> > >> > >> On 30.10.23 10:44, Timo Walther wrote: > >>> Hi Jark, > >>> > >>> my intention was to avoid too complex syntax in the first version. In > >>> the past years, we could enable use cases also without this clause, so > >>> we should be careful with overloading it with too functionality in the > >>> first version. We can still iterate on it later, the interfaces are > >>> flexible enough to support more in the future. > >>> > >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > >>> the bucket number optional. > >>> > >>> I updated the FLIP accordingly. Now the SupportsBucketing interface > >>> declares more methods that help in validation and proving helpful error > >>> messages to users. > >>> > >>> Let me know what you think. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 27.10.23 10:20, Jark Wu wrote: > >>>> Hi Timo, > >>>> > >>>> Thanks for starting this discussion. I really like it! > >>>> The FLIP is already in good shape, I only have some minor comments. > >>>> > >>>> 1. Could we also support HASH and RANGE distribution kind on the DDL > >>>> syntax? > >>>> I noticed that HASH and UNKNOWN are introduced in the Java API, but > >>>> not in > >>>> the syntax. > >>>> > >>>> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > >> TABLE? > >>>> Some storage engines support automatically determining the bucket > number > >>>> based on > >>>> the cluster resources and data size of the table. For example, > >>>> StarRocks[1] > >>>> and Paimon[2]. > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> [1]: > >>>> > >> > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > >>>> [2]: > >>>> > >> > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > >>>> > >>>> On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com> > >> wrote: > >>>> > >>>>> Very thanks Timo for starting this discussion. > >>>>> > >>>>> Big +1 for this. > >>>>> > >>>>> The design looks good to me! > >>>>> > >>>>> We can add some documentation for connector developers. For example: > >>>>> for sink, If there needs some keyby, please finish the keyby by the > >>>>> connector itself. SupportsBucketing is just a marker interface. > >>>>> > >>>>> Best, > >>>>> Jingsong > >>>>> > >>>>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org> > >> wrote: > >>>>>> > >>>>>> Hi everyone, > >>>>>> > >>>>>> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > >>>>>> clause [1]. > >>>>>> > >>>>>> Many SQL vendors expose the concepts of Partitioning, Bucketing, and > >>>>>> Clustering. This FLIP continues the work of previous FLIPs and would > >>>>>> like to introduce the concept of "Bucketing" to Flink. > >>>>>> > >>>>>> This is a pure connector characteristic and helps both Apache Kafka > >> and > >>>>>> Apache Paimon connectors in avoiding a complex WITH clause by > >> providing > >>>>>> improved syntax. > >>>>>> > >>>>>> Here is an example: > >>>>>> > >>>>>> CREATE TABLE MyTable > >>>>>> ( > >>>>>> uid BIGINT, > >>>>>> name STRING > >>>>>> ) > >>>>>> DISTRIBUTED BY (uid) INTO 6 BUCKETS > >>>>>> WITH ( > >>>>>> 'connector' = 'kafka' > >>>>>> ) > >>>>>> > >>>>>> The full syntax specification can be found in the document. The > clause > >>>>>> should be optional and fully backwards compatible. > >>>>>> > >>>>>> Regards, > >>>>>> Timo > >>>>>> > >>>>>> [1] > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > >>>>> > >>>> > >>> > >> > >> > > > >