If there are no objections, I would start with a voting on Monday.

Thanks for the feedback everyone!

Regards,
Timo


On 02.11.23 13:49, Martijn Visser wrote:
Hi all,

From a user point of view, I think it makes sense to go for
DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther <twal...@apache.org> wrote:

Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the
programmatic API. But anyway, we should discuss the SQL semantics here.
It's like a "WHERE" is called "filter" in the programmatic world. Or a
"SELECT" is called "projection" in code.

And looking at all the Hive tutorials[1], distributed by should be more
consistent. By using the "INTO n BUCKETS", we still include the
bucketing terminology in the syntax for better understanding.

If there are no other objections to this topic, I would still prefer to
go with DISTRIBUTED BY.

Regards,
Timo

[1]
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/



On 01.11.23 11:55, Jing Ge wrote:
Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther <twal...@apache.org> wrote:

Hi Jing,

   > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

   > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:
Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order
to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if
I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

      - For advanced users, the algorithm can be defined explicitly.
      - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther <twal...@apache.org>
wrote:

Let me reply to the feedback from Yunfan:

    > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

    > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]


https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

    > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:
Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:
Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but
not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
TABLE?
Some storage engines support automatically determining the bucket
number
based on
the cluster resources and data size of the table. For example,
StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:


https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:


https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li <jingsongl...@gmail.com>
wrote:

Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther <twal...@apache.org>
wrote:

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka
and
Apache Paimon connectors in avoiding a complex WITH clause by
providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
       (
         uid BIGINT,
         name STRING
       )
       DISTRIBUTED BY (uid) INTO 6 BUCKETS
       WITH (
         'connector' = 'kafka'
       )

The full syntax specification can be found in the document. The
clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause












Reply via email to