Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins.

> to avoid data skew problem

I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax.

[1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:
Hi Jark,

my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:
Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com> wrote:

Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org> wrote:

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
    (
      uid BIGINT,
      name STRING
    )
    DISTRIBUTED BY (uid) INTO 6 BUCKETS
    WITH (
      'connector' = 'kafka'
    )

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause




Reply via email to