Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther <twal...@apache.org> wrote:

> Hi Jing,
>
>  > Have you considered using BUCKET BY directly?
>
> Which vendor uses this syntax? Most vendors that I checked call this
> concept "distribution".
>
> In any case, the "BY" is optional, so certain DDL statements would
> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
> we should use the passive voice.
>
>  > Did you mean users can use their own algorithm? How to do it?
>
> "own algorithm" only refers to deciding between a list of partitioning
> strategies (namely hash and range partitioning) if the connector offers
> more than one.
>
> Regards,
> Timo
>
>
> On 30.10.23 12:39, Jing Ge wrote:
> > Hi Timo,
> >
> > The FLIP looks great! Thanks for bringing it to our attention! In order
> to
> > make sure we are on the same page, I would ask some questions:
> >
> > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
> mentioned
> > which is used to distribute rows amond reducers, i.e. focusing on the
> > shuffle during the computation. The FLIP is focusing more on storage, if
> I
> > am not mistaken. Have you considered using BUCKET BY directly?
> >
> > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
> STRING)
> > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >
> >     - For advanced users, the algorithm can be defined explicitly.
> >     - Currently, either HASH() or RANGE().
> >
> > "
> > Did you mean users can use their own algorithm? How to do it?
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org>
> wrote:
> >
> >> Let me reply to the feedback from Yunfan:
> >>
> >>   > Distribute by in DML is also supported by Hive
> >>
> >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
> >> discussion is about DDL. For DDL, we have more freedom as every vendor
> >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
> >> connector to the connector implementation, not the engine. However, for
> >> DML we need to watch out for standard compliance and introduce changes
> >> with high caution.
> >>
> >> How a LookupTableSource interprets the DISTRIBUTED BY is
> >> connector-dependent in my opinion. In general this FLIP is a sink
> >> ability, but we could have a follow FLIP that helps in distributing load
> >> of lookup joins.
> >>
> >>   > to avoid data skew problem
> >>
> >> I understand the use case and that it is important to solve it
> >> eventually. Maybe a solution might be to introduce helper Polymorphic
> >> Table Functions [1] in the future instead of new syntax.
> >>
> >> [1]
> >>
> >>
> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> >>
> >>
> >> Let me reply to the feedback from Benchao:
> >>
> >>   > Do you think it's useful to add some extensibility for the hash
> >> strategy
> >>
> >> The hash strategy is fully determined by the connector, not the Flink
> >> SQL engine. We are not using Flink's hash strategy in any way. If the
> >> hash strategy for the regular Flink file system connector should be
> >> changed, this should be expressed via config option. Otherwise we should
> >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 30.10.23 10:44, Timo Walther wrote:
> >>> Hi Jark,
> >>>
> >>> my intention was to avoid too complex syntax in the first version. In
> >>> the past years, we could enable use cases also without this clause, so
> >>> we should be careful with overloading it with too functionality in the
> >>> first version. We can still iterate on it later, the interfaces are
> >>> flexible enough to support more in the future.
> >>>
> >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
> >>> the bucket number optional.
> >>>
> >>> I updated the FLIP accordingly. Now the SupportsBucketing interface
> >>> declares more methods that help in validation and proving helpful error
> >>> messages to users.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 27.10.23 10:20, Jark Wu wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Thanks for starting this discussion. I really like it!
> >>>> The FLIP is already in good shape, I only have some minor comments.
> >>>>
> >>>> 1. Could we also support HASH and RANGE distribution kind on the DDL
> >>>> syntax?
> >>>> I noticed that HASH and UNKNOWN are introduced in the Java API, but
> >>>> not in
> >>>> the syntax.
> >>>>
> >>>> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
> >> TABLE?
> >>>> Some storage engines support automatically determining the bucket
> number
> >>>> based on
> >>>> the cluster resources and data size of the table. For example,
> >>>> StarRocks[1]
> >>>> and Paimon[2].
> >>>>
> >>>> Best,
> >>>> Jark
> >>>>
> >>>> [1]:
> >>>>
> >>
> https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
> >>>> [2]:
> >>>>
> >>
> https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket
> >>>>
> >>>> On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Very thanks Timo for starting this discussion.
> >>>>>
> >>>>> Big +1 for this.
> >>>>>
> >>>>> The design looks good to me!
> >>>>>
> >>>>> We can add some documentation for connector developers. For example:
> >>>>> for sink, If there needs some keyby, please finish the keyby by the
> >>>>> connector itself. SupportsBucketing is just a marker interface.
> >>>>>
> >>>>> Best,
> >>>>> Jingsong
> >>>>>
> >>>>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org>
> >> wrote:
> >>>>>>
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> >>>>>> clause [1].
> >>>>>>
> >>>>>> Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> >>>>>> Clustering. This FLIP continues the work of previous FLIPs and would
> >>>>>> like to introduce the concept of "Bucketing" to Flink.
> >>>>>>
> >>>>>> This is a pure connector characteristic and helps both Apache Kafka
> >> and
> >>>>>> Apache Paimon connectors in avoiding a complex WITH clause by
> >> providing
> >>>>>> improved syntax.
> >>>>>>
> >>>>>> Here is an example:
> >>>>>>
> >>>>>> CREATE TABLE MyTable
> >>>>>>      (
> >>>>>>        uid BIGINT,
> >>>>>>        name STRING
> >>>>>>      )
> >>>>>>      DISTRIBUTED BY (uid) INTO 6 BUCKETS
> >>>>>>      WITH (
> >>>>>>        'connector' = 'kafka'
> >>>>>>      )
> >>>>>>
> >>>>>> The full syntax specification can be found in the document. The
> clause
> >>>>>> should be optional and fully backwards compatible.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>> [1]
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to