Hi Jark,

here are the checks I had in mind so far. But we can also discuss this during the implementation in the PRs. Most of the tasks are very similar to PARTITIONED BY which is also a characteristic of a sink.

1) Check that DISTRIBUTED BY columns reference physical columns and at least 1. In DefaultSchemaResolver like we do for PARTITIONED BY. 2) Check that if DISTRIBUTED is defined the sink implements SupportsBucketing. In DynamicSinkUtils like we do for metadata columns.

Currently, for sources we would only check for semantical correctness (1) but not more. Like we do for PARTITIONED BY.

Do you have more checks in mind? Of course, during implementation I will make sure that all derived utils will properly work; including CREATE TABLE LIKE.

Regards,
Timo


On 31.10.23 07:22, Jark Wu wrote:
Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther <twal...@apache.org> wrote:

Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:
Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order
to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if
I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

     - For advanced users, the algorithm can be defined explicitly.
     - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org>
wrote:

Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]


https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:
Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:
Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but
not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
TABLE?
Some storage engines support automatically determining the bucket
number
based on
the cluster resources and data size of the table. For example,
StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:


https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:


https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com>
wrote:

Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org>
wrote:

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka
and
Apache Paimon connectors in avoiding a complex WITH clause by
providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
      (
        uid BIGINT,
        name STRING
      )
      DISTRIBUTED BY (uid) INTO 6 BUCKETS
      WITH (
        'connector' = 'kafka'
      )

The full syntax specification can be found in the document. The
clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause










Reply via email to