Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-03 Thread Jing Ge
Hi Timo,

Fair enough, thanks for the clarification!

Best regards,
Jing


On Fri, Nov 3, 2023 at 8:16 AM Timo Walther  wrote:

> If there are no objections, I would start with a voting on Monday.
>
> Thanks for the feedback everyone!
>
> Regards,
> Timo
>
>
> On 02.11.23 13:49, Martijn Visser wrote:
> > Hi all,
> >
> >>From a user point of view, I think it makes sense to go for
> > DISTRIBUTED BY with how Timo explained it. +1 for his proposal
> >
> > Best regards,
> >
> > Martijn
> >
> >
> > On Thu, Nov 2, 2023 at 11:00 AM Timo Walther  wrote:
> >>
> >> Hi Jing,
> >>
> >> I agree this is confusing. THe Spark API calls it bucketBy in the
> >> programmatic API. But anyway, we should discuss the SQL semantics here.
> >> It's like a "WHERE" is called "filter" in the programmatic world. Or a
> >> "SELECT" is called "projection" in code.
> >>
> >> And looking at all the Hive tutorials[1], distributed by should be more
> >> consistent. By using the "INTO n BUCKETS", we still include the
> >> bucketing terminology in the syntax for better understanding.
> >>
> >> If there are no other objections to this topic, I would still prefer to
> >> go with DISTRIBUTED BY.
> >>
> >> Regards,
> >> Timo
> >>
> >> [1]
> >>
> https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/
> >>
> >>
> >>
> >> On 01.11.23 11:55, Jing Ge wrote:
> >>> Hi Timo,
> >>>
> >>> Gotcha, let's use passive verbs. I am actually thinking about
> "BUCKETED BY
> >>> 6" or "BUCKETED INTO 6".
> >>>
> >>> Not really used in SQL, but afaiu Spark uses the concept[1].
> >>>
> >>> [1]
> >>>
> https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html
> >>>
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>> On Mon, Oct 30, 2023 at 5:25 PM Timo Walther 
> wrote:
> >>>
>  Hi Jing,
> 
> > Have you considered using BUCKET BY directly?
> 
>  Which vendor uses this syntax? Most vendors that I checked call this
>  concept "distribution".
> 
>  In any case, the "BY" is optional, so certain DDL statements would
>  declare it like "BUCKET INTO 6 BUCKETS"? And following the
> PARTITIONED,
>  we should use the passive voice.
> 
> > Did you mean users can use their own algorithm? How to do it?
> 
>  "own algorithm" only refers to deciding between a list of partitioning
>  strategies (namely hash and range partitioning) if the connector
> offers
>  more than one.
> 
>  Regards,
>  Timo
> 
> 
>  On 30.10.23 12:39, Jing Ge wrote:
> > Hi Timo,
> >
> > The FLIP looks great! Thanks for bringing it to our attention! In
> order
>  to
> > make sure we are on the same page, I would ask some questions:
> >
> > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
>  mentioned
> > which is used to distribute rows amond reducers, i.e. focusing on the
> > shuffle during the computation. The FLIP is focusing more on
> storage, if
>  I
> > am not mistaken. Have you considered using BUCKET BY directly?
> >
> > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
>  STRING)
> > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >
> >   - For advanced users, the algorithm can be defined explicitly.
> >   - Currently, either HASH() or RANGE().
> >
> > "
> > Did you mean users can use their own algorithm? How to do it?
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 
>  wrote:
> >
> >> Let me reply to the feedback from Yunfan:
> >>
> >> > Distribute by in DML is also supported by Hive
> >>
> >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions.
> This
> >> discussion is about DDL. For DDL, we have more freedom as every
> vendor
> >> has custom syntax for CREATE TABLE clauses. Furthermore, this is
> tightly
> >> connector to the connector implementation, not the engine. However,
> for
> >> DML we need to watch out for standard compliance and introduce
> changes
> >> with high caution.
> >>
> >> How a LookupTableSource interprets the DISTRIBUTED BY is
> >> connector-dependent in my opinion. In general this FLIP is a sink
> >> ability, but we could have a follow FLIP that helps in distributing
> load
> >> of lookup joins.
> >>
> >> > to avoid data skew problem
> >>
> >> I understand the use case and that it is important to solve it
> >> eventually. Maybe a solution might be to introduce helper
> Polymorphic
> >> Table Functions [1] in the future instead of new syntax.
> >>
> >> [1]
> >>
> >>
> 
> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> >>
> >>
> >> Let me reply to the feedback from Benchao:
> >>
> >> > Do you think it's useful to add some ex

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-03 Thread Timo Walther

If there are no objections, I would start with a voting on Monday.

Thanks for the feedback everyone!

Regards,
Timo


On 02.11.23 13:49, Martijn Visser wrote:

Hi all,


From a user point of view, I think it makes sense to go for

DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther  wrote:


Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the
programmatic API. But anyway, we should discuss the SQL semantics here.
It's like a "WHERE" is called "filter" in the programmatic world. Or a
"SELECT" is called "projection" in code.

And looking at all the Hive tutorials[1], distributed by should be more
consistent. By using the "INTO n BUCKETS", we still include the
bucketing terminology in the syntax for better understanding.

If there are no other objections to this topic, I would still prefer to
go with DISTRIBUTED BY.

Regards,
Timo

[1]
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/



On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

   > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

   > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

  - For advanced users, the algorithm can be defined explicitly.
  - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

> to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
me

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-02 Thread Martijn Visser
Hi all,

>From a user point of view, I think it makes sense to go for
DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther  wrote:
>
> Hi Jing,
>
> I agree this is confusing. THe Spark API calls it bucketBy in the
> programmatic API. But anyway, we should discuss the SQL semantics here.
> It's like a "WHERE" is called "filter" in the programmatic world. Or a
> "SELECT" is called "projection" in code.
>
> And looking at all the Hive tutorials[1], distributed by should be more
> consistent. By using the "INTO n BUCKETS", we still include the
> bucketing terminology in the syntax for better understanding.
>
> If there are no other objections to this topic, I would still prefer to
> go with DISTRIBUTED BY.
>
> Regards,
> Timo
>
> [1]
> https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/
>
>
>
> On 01.11.23 11:55, Jing Ge wrote:
> > Hi Timo,
> >
> > Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
> > 6" or "BUCKETED INTO 6".
> >
> > Not really used in SQL, but afaiu Spark uses the concept[1].
> >
> > [1]
> > https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html
> >
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:
> >
> >> Hi Jing,
> >>
> >>   > Have you considered using BUCKET BY directly?
> >>
> >> Which vendor uses this syntax? Most vendors that I checked call this
> >> concept "distribution".
> >>
> >> In any case, the "BY" is optional, so certain DDL statements would
> >> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
> >> we should use the passive voice.
> >>
> >>   > Did you mean users can use their own algorithm? How to do it?
> >>
> >> "own algorithm" only refers to deciding between a list of partitioning
> >> strategies (namely hash and range partitioning) if the connector offers
> >> more than one.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 30.10.23 12:39, Jing Ge wrote:
> >>> Hi Timo,
> >>>
> >>> The FLIP looks great! Thanks for bringing it to our attention! In order
> >> to
> >>> make sure we are on the same page, I would ask some questions:
> >>>
> >>> 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
> >> mentioned
> >>> which is used to distribute rows amond reducers, i.e. focusing on the
> >>> shuffle during the computation. The FLIP is focusing more on storage, if
> >> I
> >>> am not mistaken. Have you considered using BUCKET BY directly?
> >>>
> >>> 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
> >> STRING)
> >>> DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >>>
> >>>  - For advanced users, the algorithm can be defined explicitly.
> >>>  - Currently, either HASH() or RANGE().
> >>>
> >>> "
> >>> Did you mean users can use their own algorithm? How to do it?
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>> On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 
> >> wrote:
> >>>
>  Let me reply to the feedback from Yunfan:
> 
> > Distribute by in DML is also supported by Hive
> 
>  I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
>  discussion is about DDL. For DDL, we have more freedom as every vendor
>  has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
>  connector to the connector implementation, not the engine. However, for
>  DML we need to watch out for standard compliance and introduce changes
>  with high caution.
> 
>  How a LookupTableSource interprets the DISTRIBUTED BY is
>  connector-dependent in my opinion. In general this FLIP is a sink
>  ability, but we could have a follow FLIP that helps in distributing load
>  of lookup joins.
> 
> > to avoid data skew problem
> 
>  I understand the use case and that it is important to solve it
>  eventually. Maybe a solution might be to introduce helper Polymorphic
>  Table Functions [1] in the future instead of new syntax.
> 
>  [1]
> 
> 
> >> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> 
> 
>  Let me reply to the feedback from Benchao:
> 
> > Do you think it's useful to add some extensibility for the hash
>  strategy
> 
>  The hash strategy is fully determined by the connector, not the Flink
>  SQL engine. We are not using Flink's hash strategy in any way. If the
>  hash strategy for the regular Flink file system connector should be
>  changed, this should be expressed via config option. Otherwise we should
>  offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
> 
>  Regards,
>  Timo
> 
> 
>  On 30.10.23 10:44, Timo Walther wrote:
> > Hi Jark,
> >
> > my intention was to avoid too complex syntax in the first version. In
> > the past

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-02 Thread Timo Walther

Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the 
programmatic API. But anyway, we should discuss the SQL semantics here. 
It's like a "WHERE" is called "filter" in the programmatic world. Or a 
"SELECT" is called "projection" in code.


And looking at all the Hive tutorials[1], distributed by should be more 
consistent. By using the "INTO n BUCKETS", we still include the 
bucketing terminology in the syntax for better understanding.


If there are no other objections to this topic, I would still prefer to 
go with DISTRIBUTED BY.


Regards,
Timo

[1] 
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ 




On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-01 Thread Jing Ge
Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:

> Hi Jing,
>
>  > Have you considered using BUCKET BY directly?
>
> Which vendor uses this syntax? Most vendors that I checked call this
> concept "distribution".
>
> In any case, the "BY" is optional, so certain DDL statements would
> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
> we should use the passive voice.
>
>  > Did you mean users can use their own algorithm? How to do it?
>
> "own algorithm" only refers to deciding between a list of partitioning
> strategies (namely hash and range partitioning) if the connector offers
> more than one.
>
> Regards,
> Timo
>
>
> On 30.10.23 12:39, Jing Ge wrote:
> > Hi Timo,
> >
> > The FLIP looks great! Thanks for bringing it to our attention! In order
> to
> > make sure we are on the same page, I would ask some questions:
> >
> > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
> mentioned
> > which is used to distribute rows amond reducers, i.e. focusing on the
> > shuffle during the computation. The FLIP is focusing more on storage, if
> I
> > am not mistaken. Have you considered using BUCKET BY directly?
> >
> > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
> STRING)
> > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >
> > - For advanced users, the algorithm can be defined explicitly.
> > - Currently, either HASH() or RANGE().
> >
> > "
> > Did you mean users can use their own algorithm? How to do it?
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 
> wrote:
> >
> >> Let me reply to the feedback from Yunfan:
> >>
> >>   > Distribute by in DML is also supported by Hive
> >>
> >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
> >> discussion is about DDL. For DDL, we have more freedom as every vendor
> >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
> >> connector to the connector implementation, not the engine. However, for
> >> DML we need to watch out for standard compliance and introduce changes
> >> with high caution.
> >>
> >> How a LookupTableSource interprets the DISTRIBUTED BY is
> >> connector-dependent in my opinion. In general this FLIP is a sink
> >> ability, but we could have a follow FLIP that helps in distributing load
> >> of lookup joins.
> >>
> >>   > to avoid data skew problem
> >>
> >> I understand the use case and that it is important to solve it
> >> eventually. Maybe a solution might be to introduce helper Polymorphic
> >> Table Functions [1] in the future instead of new syntax.
> >>
> >> [1]
> >>
> >>
> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> >>
> >>
> >> Let me reply to the feedback from Benchao:
> >>
> >>   > Do you think it's useful to add some extensibility for the hash
> >> strategy
> >>
> >> The hash strategy is fully determined by the connector, not the Flink
> >> SQL engine. We are not using Flink's hash strategy in any way. If the
> >> hash strategy for the regular Flink file system connector should be
> >> changed, this should be expressed via config option. Otherwise we should
> >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 30.10.23 10:44, Timo Walther wrote:
> >>> Hi Jark,
> >>>
> >>> my intention was to avoid too complex syntax in the first version. In
> >>> the past years, we could enable use cases also without this clause, so
> >>> we should be careful with overloading it with too functionality in the
> >>> first version. We can still iterate on it later, the interfaces are
> >>> flexible enough to support more in the future.
> >>>
> >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
> >>> the bucket number optional.
> >>>
> >>> I updated the FLIP accordingly. Now the SupportsBucketing interface
> >>> declares more methods that help in validation and proving helpful error
> >>> messages to users.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 27.10.23 10:20, Jark Wu wrote:
>  Hi Timo,
> 
>  Thanks for starting this discussion. I really like it!
>  The FLIP is already in good shape, I only have some minor comments.
> 
>  1. Could we also support HASH and RANGE distribution kind on the DDL
>  syntax?
>  I noticed that HASH and UNKNOWN are introduced in the Java API, but
>  not in
>  the syntax.
> 
>  2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
> >> TABLE?
>  Some storage engines support automatically determining the bucket
> number
>  based

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-31 Thread Timo Walther

Hi Jark,

here are the checks I had in mind so far. But we can also discuss this 
during the implementation in the PRs. Most of the tasks are very similar 
to PARTITIONED BY which is also a characteristic of a sink.


1) Check that DISTRIBUTED BY columns reference physical columns and at 
least 1. In DefaultSchemaResolver like we do for PARTITIONED BY.
2) Check that if DISTRIBUTED is defined the sink implements 
SupportsBucketing. In DynamicSinkUtils like we do for metadata columns.


Currently, for sources we would only check for semantical correctness 
(1) but not more. Like we do for PARTITIONED BY.


Do you have more checks in mind? Of course, during implementation I will 
make sure that all derived utils will properly work; including CREATE 
TABLE LIKE.


Regards,
Timo


On 31.10.23 07:22, Jark Wu wrote:

Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Jark Wu
Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther  wrote:

> Hi Jing,
>
>  > Have you considered using BUCKET BY directly?
>
> Which vendor uses this syntax? Most vendors that I checked call this
> concept "distribution".
>
> In any case, the "BY" is optional, so certain DDL statements would
> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
> we should use the passive voice.
>
>  > Did you mean users can use their own algorithm? How to do it?
>
> "own algorithm" only refers to deciding between a list of partitioning
> strategies (namely hash and range partitioning) if the connector offers
> more than one.
>
> Regards,
> Timo
>
>
> On 30.10.23 12:39, Jing Ge wrote:
> > Hi Timo,
> >
> > The FLIP looks great! Thanks for bringing it to our attention! In order
> to
> > make sure we are on the same page, I would ask some questions:
> >
> > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao
> mentioned
> > which is used to distribute rows amond reducers, i.e. focusing on the
> > shuffle during the computation. The FLIP is focusing more on storage, if
> I
> > am not mistaken. Have you considered using BUCKET BY directly?
> >
> > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name
> STRING)
> > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
> >
> > - For advanced users, the algorithm can be defined explicitly.
> > - Currently, either HASH() or RANGE().
> >
> > "
> > Did you mean users can use their own algorithm? How to do it?
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 
> wrote:
> >
> >> Let me reply to the feedback from Yunfan:
> >>
> >>   > Distribute by in DML is also supported by Hive
> >>
> >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
> >> discussion is about DDL. For DDL, we have more freedom as every vendor
> >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
> >> connector to the connector implementation, not the engine. However, for
> >> DML we need to watch out for standard compliance and introduce changes
> >> with high caution.
> >>
> >> How a LookupTableSource interprets the DISTRIBUTED BY is
> >> connector-dependent in my opinion. In general this FLIP is a sink
> >> ability, but we could have a follow FLIP that helps in distributing load
> >> of lookup joins.
> >>
> >>   > to avoid data skew problem
> >>
> >> I understand the use case and that it is important to solve it
> >> eventually. Maybe a solution might be to introduce helper Polymorphic
> >> Table Functions [1] in the future instead of new syntax.
> >>
> >> [1]
> >>
> >>
> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
> >>
> >>
> >> Let me reply to the feedback from Benchao:
> >>
> >>   > Do you think it's useful to add some extensibility for the hash
> >> strategy
> >>
> >> The hash strategy is fully determined by the connector, not the Flink
> >> SQL engine. We are not using Flink's hash strategy in any way. If the
> >> hash strategy for the regular Flink file system connector should be
> >> changed, this should be expressed via config option. Otherwise we should
> >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 30.10.23 10:44, Timo Walther wrote:
> >>> Hi Jark,
> >>>
> >>> my intention was to avoid too complex syntax in the first version. In
> >>> the past years, we could enable use cases also without this clause, so
> >>> we should be careful with overloading it with too functionality in the
> >>> first version. We can still iterate on it later, the interfaces are
> >>> flexible enough to support more in the future.
> >>>
> >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
> >>> the bucket number optional.
> >>>
> >>> I updated the FLIP accordingly. Now the SupportsBucketing interface
> >>> declares more methods that help in validation and proving helpful error
> >>> messages to users.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 27.10.23 10:20, Jark Wu wrote:
>  Hi Timo,
> 
>  Thanks for starting this discussion. I really like it!
>  The FLIP is already in good shape, I only have some minor comments.
> 
>  1. Could we also support HASH and RANGE distribution kind on the DDL
>  syntax?
>  I noticed that HASH and UNKNOWN are introduced in the Java API, but
>  not in
>  the syntax.
> 
>  2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
> >> TABLE?
>  Some storage engines support automatically determining the bucket
> number
>  based on
>  the cluster resources an

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jing,

> Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this 
concept "distribution".


In any case, the "BY" is optional, so certain DDL statements would 
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, 
we should use the passive voice.


> Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning 
strategies (namely hash and range partitioning) if the connector offers 
more than one.


Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

- For advanced users, the algorithm can be defined explicitly.
- Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther  wrote:


Let me reply to the feedback from Yunfan:

  > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

  > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]

https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

  > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but
not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER

TABLE?

Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example,
StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:


https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets

[2]:


https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket


On Thu, 26 Oct 2023 at 18:26, Jingsong Li 

wrote:



Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther 

wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. Thi

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Jing Ge
Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

   - For advanced users, the algorithm can be defined explicitly.
   - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther  wrote:

> Let me reply to the feedback from Yunfan:
>
>  > Distribute by in DML is also supported by Hive
>
> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
> discussion is about DDL. For DDL, we have more freedom as every vendor
> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
> connector to the connector implementation, not the engine. However, for
> DML we need to watch out for standard compliance and introduce changes
> with high caution.
>
> How a LookupTableSource interprets the DISTRIBUTED BY is
> connector-dependent in my opinion. In general this FLIP is a sink
> ability, but we could have a follow FLIP that helps in distributing load
> of lookup joins.
>
>  > to avoid data skew problem
>
> I understand the use case and that it is important to solve it
> eventually. Maybe a solution might be to introduce helper Polymorphic
> Table Functions [1] in the future instead of new syntax.
>
> [1]
>
> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf
>
>
> Let me reply to the feedback from Benchao:
>
>  > Do you think it's useful to add some extensibility for the hash
> strategy
>
> The hash strategy is fully determined by the connector, not the Flink
> SQL engine. We are not using Flink's hash strategy in any way. If the
> hash strategy for the regular Flink file system connector should be
> changed, this should be expressed via config option. Otherwise we should
> offer a dedicated `hive-filesystem` or `spark-filesystem` connector.
>
> Regards,
> Timo
>
>
> On 30.10.23 10:44, Timo Walther wrote:
> > Hi Jark,
> >
> > my intention was to avoid too complex syntax in the first version. In
> > the past years, we could enable use cases also without this clause, so
> > we should be careful with overloading it with too functionality in the
> > first version. We can still iterate on it later, the interfaces are
> > flexible enough to support more in the future.
> >
> > I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
> > the bucket number optional.
> >
> > I updated the FLIP accordingly. Now the SupportsBucketing interface
> > declares more methods that help in validation and proving helpful error
> > messages to users.
> >
> > Let me know what you think.
> >
> > Regards,
> > Timo
> >
> >
> > On 27.10.23 10:20, Jark Wu wrote:
> >> Hi Timo,
> >>
> >> Thanks for starting this discussion. I really like it!
> >> The FLIP is already in good shape, I only have some minor comments.
> >>
> >> 1. Could we also support HASH and RANGE distribution kind on the DDL
> >> syntax?
> >> I noticed that HASH and UNKNOWN are introduced in the Java API, but
> >> not in
> >> the syntax.
> >>
> >> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER
> TABLE?
> >> Some storage engines support automatically determining the bucket number
> >> based on
> >> the cluster resources and data size of the table. For example,
> >> StarRocks[1]
> >> and Paimon[2].
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
> >> [2]:
> >>
> https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket
> >>
> >> On Thu, 26 Oct 2023 at 18:26, Jingsong Li 
> wrote:
> >>
> >>> Very thanks Timo for starting this discussion.
> >>>
> >>> Big +1 for this.
> >>>
> >>> The design looks good to me!
> >>>
> >>> We can add some documentation for connector developers. For example:
> >>> for sink, If there needs some keyby, please finish the keyby by the
> >>> connector itself. SupportsBucketing is just a marker interface.
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther 
> wrote:
> 
>  Hi everyone,
> 
>  I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
>  clause [1].
> 
>  Many SQL vendors expose the concepts of Partitioning, Bucketing, and
>  Clustering. This FLIP continues the work of previous FLIPs and would
>  like to introduce the concept of "Bucketing" to Flink.
> 
>  This is a pure connector characteristic and helps both Apache Kafka
> and

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This 
discussion is about DDL. For DDL, we have more freedom as every vendor 
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly 
connector to the connector implementation, not the engine. However, for 
DML we need to watch out for standard compliance and introduce changes 
with high caution.


How a LookupTableSource interprets the DISTRIBUTED BY is 
connector-dependent in my opinion. In general this FLIP is a sink 
ability, but we could have a follow FLIP that helps in distributing load 
of lookup joins.


> to avoid data skew problem

I understand the use case and that it is important to solve it 
eventually. Maybe a solution might be to introduce helper Polymorphic 
Table Functions [1] in the future instead of new syntax.


[1] 
https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink 
SQL engine. We are not using Flink's hash strategy in any way. If the 
hash strategy for the regular Flink file system connector should be 
changed, this should be expressed via config option. Otherwise we should 
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.


Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but 
not in

the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, 
StarRocks[1]

and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
    (
  uid BIGINT,
  name STRING
    )
    DISTRIBUTED BY (uid) INTO 6 BUCKETS
    WITH (
  'connector' = 'kafka'
    )

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause









Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Yunfan and Benchao,

it seems the FLIP discussion thread got split into two parts. At least 
this is what I see in my mail program. I would kindly ask to answer in 
the other thread [1].


I will also reply there now to maintain the discussion link.

Regards,
Timo

[1] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1



On 28.10.23 10:34, Benchao Li wrote:

Thanks Timo for preparing the FLIP.

Regarding "By default, DISTRIBUTED BY assumes a list of columns for an
implicit hash partitioning."
Do you think it's useful to add some extensibility for the hash
strategy. One scenario I can foresee is if we write bucketed data into
Hive, and if Flink's hash strategy is different than Hive/Spark's,
then they could not utilize the bucketed data written by Flink. This
is the one case I met in production already, there may be more cases
like this that needs customize the hash strategy to accommodate with
existing systems.

yunfan zhang  于2023年10月27日周五 19:06写道:


Distribute by in DML is also supported by Hive.
And it is also useful for flink.
Users can use this ability to increase cache hit rate in lookup join.
And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
And I think it is another way to solve this Flip204[1]
There is already has some people required this feature[2]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] https://issues.apache.org/jira/browse/FLINK-27541

On 2023/10/27 08:20:25 Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause











Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause







Re: Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-28 Thread Benchao Li
Thanks Timo for preparing the FLIP.

Regarding "By default, DISTRIBUTED BY assumes a list of columns for an
implicit hash partitioning."
Do you think it's useful to add some extensibility for the hash
strategy. One scenario I can foresee is if we write bucketed data into
Hive, and if Flink's hash strategy is different than Hive/Spark's,
then they could not utilize the bucketed data written by Flink. This
is the one case I met in production already, there may be more cases
like this that needs customize the hash strategy to accommodate with
existing systems.

yunfan zhang  于2023年10月27日周五 19:06写道:
>
> Distribute by in DML is also supported by Hive.
> And it is also useful for flink.
> Users can use this ability to increase cache hit rate in lookup join.
> And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
> And I think it is another way to solve this Flip204[1]
> There is already has some people required this feature[2]
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> [2] https://issues.apache.org/jira/browse/FLINK-27541
>
> On 2023/10/27 08:20:25 Jark Wu wrote:
> > Hi Timo,
> >
> > Thanks for starting this discussion. I really like it!
> > The FLIP is already in good shape, I only have some minor comments.
> >
> > 1. Could we also support HASH and RANGE distribution kind on the DDL
> > syntax?
> > I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
> > the syntax.
> >
> > 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
> > Some storage engines support automatically determining the bucket number
> > based on
> > the cluster resources and data size of the table. For example, StarRocks[1]
> > and Paimon[2].
> >
> > Best,
> > Jark
> >
> > [1]:
> > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
> > [2]:
> > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket
> >
> > On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:
> >
> > > Very thanks Timo for starting this discussion.
> > >
> > > Big +1 for this.
> > >
> > > The design looks good to me!
> > >
> > > We can add some documentation for connector developers. For example:
> > > for sink, If there needs some keyby, please finish the keyby by the
> > > connector itself. SupportsBucketing is just a marker interface.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> > > > clause [1].
> > > >
> > > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> > > > Clustering. This FLIP continues the work of previous FLIPs and would
> > > > like to introduce the concept of "Bucketing" to Flink.
> > > >
> > > > This is a pure connector characteristic and helps both Apache Kafka and
> > > > Apache Paimon connectors in avoiding a complex WITH clause by providing
> > > > improved syntax.
> > > >
> > > > Here is an example:
> > > >
> > > > CREATE TABLE MyTable
> > > >(
> > > >  uid BIGINT,
> > > >  name STRING
> > > >)
> > > >DISTRIBUTED BY (uid) INTO 6 BUCKETS
> > > >WITH (
> > > >  'connector' = 'kafka'
> > > >)
> > > >
> > > > The full syntax specification can be found in the document. The clause
> > > > should be optional and fully backwards compatible.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > > [1]
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> > >
> >



-- 

Best,
Benchao Li


RE: Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-27 Thread yunfan zhang
Distribute by in DML is also supported by Hive.
And it is also useful for flink.
Users can use this ability to increase cache hit rate in lookup join.
And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
And I think it is another way to solve this Flip204[1]
There is already has some people required this feature[2]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] https://issues.apache.org/jira/browse/FLINK-27541

On 2023/10/27 08:20:25 Jark Wu wrote:
> Hi Timo,
> 
> Thanks for starting this discussion. I really like it!
> The FLIP is already in good shape, I only have some minor comments.
> 
> 1. Could we also support HASH and RANGE distribution kind on the DDL
> syntax?
> I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
> the syntax.
> 
> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
> Some storage engines support automatically determining the bucket number
> based on
> the cluster resources and data size of the table. For example, StarRocks[1]
> and Paimon[2].
> 
> Best,
> Jark
> 
> [1]:
> https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
> [2]:
> https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket
> 
> On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:
> 
> > Very thanks Timo for starting this discussion.
> >
> > Big +1 for this.
> >
> > The design looks good to me!
> >
> > We can add some documentation for connector developers. For example:
> > for sink, If there needs some keyby, please finish the keyby by the
> > connector itself. SupportsBucketing is just a marker interface.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:
> > >
> > > Hi everyone,
> > >
> > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> > > clause [1].
> > >
> > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> > > Clustering. This FLIP continues the work of previous FLIPs and would
> > > like to introduce the concept of "Bucketing" to Flink.
> > >
> > > This is a pure connector characteristic and helps both Apache Kafka and
> > > Apache Paimon connectors in avoiding a complex WITH clause by providing
> > > improved syntax.
> > >
> > > Here is an example:
> > >
> > > CREATE TABLE MyTable
> > >(
> > >  uid BIGINT,
> > >  name STRING
> > >)
> > >DISTRIBUTED BY (uid) INTO 6 BUCKETS
> > >WITH (
> > >  'connector' = 'kafka'
> > >)
> > >
> > > The full syntax specification can be found in the document. The clause
> > > should be optional and fully backwards compatible.
> > >
> > > Regards,
> > > Timo
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> >
> 

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-27 Thread Jark Wu
Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:

> Very thanks Timo for starting this discussion.
>
> Big +1 for this.
>
> The design looks good to me!
>
> We can add some documentation for connector developers. For example:
> for sink, If there needs some keyby, please finish the keyby by the
> connector itself. SupportsBucketing is just a marker interface.
>
> Best,
> Jingsong
>
> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:
> >
> > Hi everyone,
> >
> > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> > clause [1].
> >
> > Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> > Clustering. This FLIP continues the work of previous FLIPs and would
> > like to introduce the concept of "Bucketing" to Flink.
> >
> > This is a pure connector characteristic and helps both Apache Kafka and
> > Apache Paimon connectors in avoiding a complex WITH clause by providing
> > improved syntax.
> >
> > Here is an example:
> >
> > CREATE TABLE MyTable
> >(
> >  uid BIGINT,
> >  name STRING
> >)
> >DISTRIBUTED BY (uid) INTO 6 BUCKETS
> >WITH (
> >  'connector' = 'kafka'
> >)
> >
> > The full syntax specification can be found in the document. The clause
> > should be optional and fully backwards compatible.
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
>


Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-26 Thread Jingsong Li
Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:
>
> Hi everyone,
>
> I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
> clause [1].
>
> Many SQL vendors expose the concepts of Partitioning, Bucketing, and
> Clustering. This FLIP continues the work of previous FLIPs and would
> like to introduce the concept of "Bucketing" to Flink.
>
> This is a pure connector characteristic and helps both Apache Kafka and
> Apache Paimon connectors in avoiding a complex WITH clause by providing
> improved syntax.
>
> Here is an example:
>
> CREATE TABLE MyTable
>(
>  uid BIGINT,
>  name STRING
>)
>DISTRIBUTED BY (uid) INTO 6 BUCKETS
>WITH (
>  'connector' = 'kafka'
>)
>
> The full syntax specification can be found in the document. The clause
> should be optional and fully backwards compatible.
>
> Regards,
> Timo
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause