Hi Yunfan and Benchao,

it seems the FLIP discussion thread got split into two parts. At least this is what I see in my mail program. I would kindly ask to answer in the other thread [1].

I will also reply there now to maintain the discussion link.

Regards,
Timo

[1] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1



On 28.10.23 10:34, Benchao Li wrote:
Thanks Timo for preparing the FLIP.

Regarding "By default, DISTRIBUTED BY assumes a list of columns for an
implicit hash partitioning."
Do you think it's useful to add some extensibility for the hash
strategy. One scenario I can foresee is if we write bucketed data into
Hive, and if Flink's hash strategy is different than Hive/Spark's,
then they could not utilize the bucketed data written by Flink. This
is the one case I met in production already, there may be more cases
like this that needs customize the hash strategy to accommodate with
existing systems.

yunfan zhang <yunfanfight...@gmail.com> 于2023年10月27日周五 19:06写道:

Distribute by in DML is also supported by Hive.
And it is also useful for flink.
Users can use this ability to increase cache hit rate in lookup join.
And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
And I think it is another way to solve this Flip204[1]
There is already has some people required this feature[2]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] https://issues.apache.org/jira/browse/FLINK-27541

On 2023/10/27 08:20:25 Jark Wu wrote:
Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li <ji...@gmail.com> wrote:

Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <tw...@apache.org> wrote:

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
    (
      uid BIGINT,
      name STRING
    )
    DISTRIBUTED BY (uid) INTO 6 BUCKETS
    WITH (
      'connector' = 'kafka'
    )

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause






Reply via email to