Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Timo, Fair enough, thanks for the clarification! Best regards, Jing On Fri, Nov 3, 2023 at 8:16 AM Timo Walther wrote: > If there are no objections, I would start with a voting on Monday. > > Thanks for the feedback everyone! > > Regards, > Timo > > > On 02.11.23 13:49, Martijn Visser wrote: > > Hi all, > > > >>From a user point of view, I think it makes sense to go for > > DISTRIBUTED BY with how Timo explained it. +1 for his proposal > > > > Best regards, > > > > Martijn > > > > > > On Thu, Nov 2, 2023 at 11:00 AM Timo Walther wrote: > >> > >> Hi Jing, > >> > >> I agree this is confusing. THe Spark API calls it bucketBy in the > >> programmatic API. But anyway, we should discuss the SQL semantics here. > >> It's like a "WHERE" is called "filter" in the programmatic world. Or a > >> "SELECT" is called "projection" in code. > >> > >> And looking at all the Hive tutorials[1], distributed by should be more > >> consistent. By using the "INTO n BUCKETS", we still include the > >> bucketing terminology in the syntax for better understanding. > >> > >> If there are no other objections to this topic, I would still prefer to > >> go with DISTRIBUTED BY. > >> > >> Regards, > >> Timo > >> > >> [1] > >> > https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ > >> > >> > >> > >> On 01.11.23 11:55, Jing Ge wrote: > >>> Hi Timo, > >>> > >>> Gotcha, let's use passive verbs. I am actually thinking about > "BUCKETED BY > >>> 6" or "BUCKETED INTO 6". > >>> > >>> Not really used in SQL, but afaiu Spark uses the concept[1]. > >>> > >>> [1] > >>> > https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html > >>> > >>> > >>> Best regards, > >>> Jing > >>> > >>> On Mon, Oct 30, 2023 at 5:25 PM Timo Walther > wrote: > >>> > Hi Jing, > > > Have you considered using BUCKET BY directly? > > Which vendor uses this syntax? Most vendors that I checked call this > concept "distribution". > > In any case, the "BY" is optional, so certain DDL statements would > declare it like "BUCKET INTO 6 BUCKETS"? And following the > PARTITIONED, > we should use the passive voice. > > > Did you mean users can use their own algorithm? How to do it? > > "own algorithm" only refers to deciding between a list of partitioning > strategies (namely hash and range partitioning) if the connector > offers > more than one. > > Regards, > Timo > > > On 30.10.23 12:39, Jing Ge wrote: > > Hi Timo, > > > > The FLIP looks great! Thanks for bringing it to our attention! In > order > to > > make sure we are on the same page, I would ask some questions: > > > > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > mentioned > > which is used to distribute rows amond reducers, i.e. focusing on the > > shuffle during the computation. The FLIP is focusing more on > storage, if > I > > am not mistaken. Have you considered using BUCKET BY directly? > > > > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > STRING) > > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > > > > - For advanced users, the algorithm can be defined explicitly. > > - Currently, either HASH() or RANGE(). > > > > " > > Did you mean users can use their own algorithm? How to do it? > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther > wrote: > > > >> Let me reply to the feedback from Yunfan: > >> > >> > Distribute by in DML is also supported by Hive > >> > >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. > This > >> discussion is about DDL. For DDL, we have more freedom as every > vendor > >> has custom syntax for CREATE TABLE clauses. Furthermore, this is > tightly > >> connector to the connector implementation, not the engine. However, > for > >> DML we need to watch out for standard compliance and introduce > changes > >> with high caution. > >> > >> How a LookupTableSource interprets the DISTRIBUTED BY is > >> connector-dependent in my opinion. In general this FLIP is a sink > >> ability, but we could have a follow FLIP that helps in distributing > load > >> of lookup joins. > >> > >> > to avoid data skew problem > >> > >> I understand the use case and that it is important to solve it > >> eventually. Maybe a solution might be to introduce helper > Polymorphic > >> Table Functions [1] in the future instead of new syntax. > >> > >> [1] > >> > >> > > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > >> > >> > >> Let me reply to the feedback from Benchao: > >> > >> > Do you think it's useful to add some ex
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
If there are no objections, I would start with a voting on Monday. Thanks for the feedback everyone! Regards, Timo On 02.11.23 13:49, Martijn Visser wrote: Hi all, From a user point of view, I think it makes sense to go for DISTRIBUTED BY with how Timo explained it. +1 for his proposal Best regards, Martijn On Thu, Nov 2, 2023 at 11:00 AM Timo Walther wrote: Hi Jing, I agree this is confusing. THe Spark API calls it bucketBy in the programmatic API. But anyway, we should discuss the SQL semantics here. It's like a "WHERE" is called "filter" in the programmatic world. Or a "SELECT" is called "projection" in code. And looking at all the Hive tutorials[1], distributed by should be more consistent. By using the "INTO n BUCKETS", we still include the bucketing terminology in the syntax for better understanding. If there are no other objections to this topic, I would still prefer to go with DISTRIBUTED BY. Regards, Timo [1] https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ On 01.11.23 11:55, Jing Ge wrote: Hi Timo, Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY 6" or "BUCKETED INTO 6". Not really used in SQL, but afaiu Spark uses the concept[1]. [1] https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html Best regards, Jing On Mon, Oct 30, 2023 at 5:25 PM Timo Walther wrote: Hi Jing, > Have you considered using BUCKET BY directly? Which vendor uses this syntax? Most vendors that I checked call this concept "distribution". In any case, the "BY" is optional, so certain DDL statements would declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, we should use the passive voice. > Did you mean users can use their own algorithm? How to do it? "own algorithm" only refers to deciding between a list of partitioning strategies (namely hash and range partitioning) if the connector offers more than one. Regards, Timo On 30.10.23 12:39, Jing Ge wrote: Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions: 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther wrote: Let me reply to the feedback from Yunfan: > Distribute by in DML is also supported by Hive I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution. How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins. > to avoid data skew problem I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax. [1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf Let me reply to the feedback from Benchao: > Do you think it's useful to add some extensibility for the hash strategy The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector. Regards, Timo On 30.10.23 10:44, Timo Walther wrote: Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error me
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi all, >From a user point of view, I think it makes sense to go for DISTRIBUTED BY with how Timo explained it. +1 for his proposal Best regards, Martijn On Thu, Nov 2, 2023 at 11:00 AM Timo Walther wrote: > > Hi Jing, > > I agree this is confusing. THe Spark API calls it bucketBy in the > programmatic API. But anyway, we should discuss the SQL semantics here. > It's like a "WHERE" is called "filter" in the programmatic world. Or a > "SELECT" is called "projection" in code. > > And looking at all the Hive tutorials[1], distributed by should be more > consistent. By using the "INTO n BUCKETS", we still include the > bucketing terminology in the syntax for better understanding. > > If there are no other objections to this topic, I would still prefer to > go with DISTRIBUTED BY. > > Regards, > Timo > > [1] > https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ > > > > On 01.11.23 11:55, Jing Ge wrote: > > Hi Timo, > > > > Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY > > 6" or "BUCKETED INTO 6". > > > > Not really used in SQL, but afaiu Spark uses the concept[1]. > > > > [1] > > https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html > > > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 5:25 PM Timo Walther wrote: > > > >> Hi Jing, > >> > >> > Have you considered using BUCKET BY directly? > >> > >> Which vendor uses this syntax? Most vendors that I checked call this > >> concept "distribution". > >> > >> In any case, the "BY" is optional, so certain DDL statements would > >> declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, > >> we should use the passive voice. > >> > >> > Did you mean users can use their own algorithm? How to do it? > >> > >> "own algorithm" only refers to deciding between a list of partitioning > >> strategies (namely hash and range partitioning) if the connector offers > >> more than one. > >> > >> Regards, > >> Timo > >> > >> > >> On 30.10.23 12:39, Jing Ge wrote: > >>> Hi Timo, > >>> > >>> The FLIP looks great! Thanks for bringing it to our attention! In order > >> to > >>> make sure we are on the same page, I would ask some questions: > >>> > >>> 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > >> mentioned > >>> which is used to distribute rows amond reducers, i.e. focusing on the > >>> shuffle during the computation. The FLIP is focusing more on storage, if > >> I > >>> am not mistaken. Have you considered using BUCKET BY directly? > >>> > >>> 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > >> STRING) > >>> DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > >>> > >>> - For advanced users, the algorithm can be defined explicitly. > >>> - Currently, either HASH() or RANGE(). > >>> > >>> " > >>> Did you mean users can use their own algorithm? How to do it? > >>> > >>> Best regards, > >>> Jing > >>> > >>> On Mon, Oct 30, 2023 at 11:13 AM Timo Walther > >> wrote: > >>> > Let me reply to the feedback from Yunfan: > > > Distribute by in DML is also supported by Hive > > I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > discussion is about DDL. For DDL, we have more freedom as every vendor > has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > connector to the connector implementation, not the engine. However, for > DML we need to watch out for standard compliance and introduce changes > with high caution. > > How a LookupTableSource interprets the DISTRIBUTED BY is > connector-dependent in my opinion. In general this FLIP is a sink > ability, but we could have a follow FLIP that helps in distributing load > of lookup joins. > > > to avoid data skew problem > > I understand the use case and that it is important to solve it > eventually. Maybe a solution might be to introduce helper Polymorphic > Table Functions [1] in the future instead of new syntax. > > [1] > > > >> https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > > > Let me reply to the feedback from Benchao: > > > Do you think it's useful to add some extensibility for the hash > strategy > > The hash strategy is fully determined by the connector, not the Flink > SQL engine. We are not using Flink's hash strategy in any way. If the > hash strategy for the regular Flink file system connector should be > changed, this should be expressed via config option. Otherwise we should > offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > > Regards, > Timo > > > On 30.10.23 10:44, Timo Walther wrote: > > Hi Jark, > > > > my intention was to avoid too complex syntax in the first version. In > > the past
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Jing, I agree this is confusing. THe Spark API calls it bucketBy in the programmatic API. But anyway, we should discuss the SQL semantics here. It's like a "WHERE" is called "filter" in the programmatic world. Or a "SELECT" is called "projection" in code. And looking at all the Hive tutorials[1], distributed by should be more consistent. By using the "INTO n BUCKETS", we still include the bucketing terminology in the syntax for better understanding. If there are no other objections to this topic, I would still prefer to go with DISTRIBUTED BY. Regards, Timo [1] https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ On 01.11.23 11:55, Jing Ge wrote: Hi Timo, Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY 6" or "BUCKETED INTO 6". Not really used in SQL, but afaiu Spark uses the concept[1]. [1] https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html Best regards, Jing On Mon, Oct 30, 2023 at 5:25 PM Timo Walther wrote: Hi Jing, > Have you considered using BUCKET BY directly? Which vendor uses this syntax? Most vendors that I checked call this concept "distribution". In any case, the "BY" is optional, so certain DDL statements would declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, we should use the passive voice. > Did you mean users can use their own algorithm? How to do it? "own algorithm" only refers to deciding between a list of partitioning strategies (namely hash and range partitioning) if the connector offers more than one. Regards, Timo On 30.10.23 12:39, Jing Ge wrote: Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions: 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther wrote: Let me reply to the feedback from Yunfan: > Distribute by in DML is also supported by Hive I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution. How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins. > to avoid data skew problem I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax. [1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf Let me reply to the feedback from Benchao: > Do you think it's useful to add some extensibility for the hash strategy The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector. Regards, Timo On 30.10.23 10:44, Timo Walther wrote: Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users. Let me know what you think. Regards, Timo On 27.10.23 10:20, Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Timo, Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY 6" or "BUCKETED INTO 6". Not really used in SQL, but afaiu Spark uses the concept[1]. [1] https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html Best regards, Jing On Mon, Oct 30, 2023 at 5:25 PM Timo Walther wrote: > Hi Jing, > > > Have you considered using BUCKET BY directly? > > Which vendor uses this syntax? Most vendors that I checked call this > concept "distribution". > > In any case, the "BY" is optional, so certain DDL statements would > declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, > we should use the passive voice. > > > Did you mean users can use their own algorithm? How to do it? > > "own algorithm" only refers to deciding between a list of partitioning > strategies (namely hash and range partitioning) if the connector offers > more than one. > > Regards, > Timo > > > On 30.10.23 12:39, Jing Ge wrote: > > Hi Timo, > > > > The FLIP looks great! Thanks for bringing it to our attention! In order > to > > make sure we are on the same page, I would ask some questions: > > > > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > mentioned > > which is used to distribute rows amond reducers, i.e. focusing on the > > shuffle during the computation. The FLIP is focusing more on storage, if > I > > am not mistaken. Have you considered using BUCKET BY directly? > > > > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > STRING) > > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > > > > - For advanced users, the algorithm can be defined explicitly. > > - Currently, either HASH() or RANGE(). > > > > " > > Did you mean users can use their own algorithm? How to do it? > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther > wrote: > > > >> Let me reply to the feedback from Yunfan: > >> > >> > Distribute by in DML is also supported by Hive > >> > >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > >> discussion is about DDL. For DDL, we have more freedom as every vendor > >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > >> connector to the connector implementation, not the engine. However, for > >> DML we need to watch out for standard compliance and introduce changes > >> with high caution. > >> > >> How a LookupTableSource interprets the DISTRIBUTED BY is > >> connector-dependent in my opinion. In general this FLIP is a sink > >> ability, but we could have a follow FLIP that helps in distributing load > >> of lookup joins. > >> > >> > to avoid data skew problem > >> > >> I understand the use case and that it is important to solve it > >> eventually. Maybe a solution might be to introduce helper Polymorphic > >> Table Functions [1] in the future instead of new syntax. > >> > >> [1] > >> > >> > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > >> > >> > >> Let me reply to the feedback from Benchao: > >> > >> > Do you think it's useful to add some extensibility for the hash > >> strategy > >> > >> The hash strategy is fully determined by the connector, not the Flink > >> SQL engine. We are not using Flink's hash strategy in any way. If the > >> hash strategy for the regular Flink file system connector should be > >> changed, this should be expressed via config option. Otherwise we should > >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > >> > >> Regards, > >> Timo > >> > >> > >> On 30.10.23 10:44, Timo Walther wrote: > >>> Hi Jark, > >>> > >>> my intention was to avoid too complex syntax in the first version. In > >>> the past years, we could enable use cases also without this clause, so > >>> we should be careful with overloading it with too functionality in the > >>> first version. We can still iterate on it later, the interfaces are > >>> flexible enough to support more in the future. > >>> > >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > >>> the bucket number optional. > >>> > >>> I updated the FLIP accordingly. Now the SupportsBucketing interface > >>> declares more methods that help in validation and proving helpful error > >>> messages to users. > >>> > >>> Let me know what you think. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 27.10.23 10:20, Jark Wu wrote: > Hi Timo, > > Thanks for starting this discussion. I really like it! > The FLIP is already in good shape, I only have some minor comments. > > 1. Could we also support HASH and RANGE distribution kind on the DDL > syntax? > I noticed that HASH and UNKNOWN are introduced in the Java API, but > not in > the syntax. > > 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > >> TABLE? > Some storage engines support automatically determining the bucket > number > based
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Jark, here are the checks I had in mind so far. But we can also discuss this during the implementation in the PRs. Most of the tasks are very similar to PARTITIONED BY which is also a characteristic of a sink. 1) Check that DISTRIBUTED BY columns reference physical columns and at least 1. In DefaultSchemaResolver like we do for PARTITIONED BY. 2) Check that if DISTRIBUTED is defined the sink implements SupportsBucketing. In DynamicSinkUtils like we do for metadata columns. Currently, for sources we would only check for semantical correctness (1) but not more. Like we do for PARTITIONED BY. Do you have more checks in mind? Of course, during implementation I will make sure that all derived utils will properly work; including CREATE TABLE LIKE. Regards, Timo On 31.10.23 07:22, Jark Wu wrote: Hi Timo, Thank you for the update. The FLIP looks good to me now. I only have one more question. What does Flink check and throw exceptions for the bucketing? For example, do we check interfaces when executing create/alter DDL and when used as a source? Best, Jark On Tue, 31 Oct 2023 at 00:25, Timo Walther wrote: Hi Jing, > Have you considered using BUCKET BY directly? Which vendor uses this syntax? Most vendors that I checked call this concept "distribution". In any case, the "BY" is optional, so certain DDL statements would declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, we should use the passive voice. > Did you mean users can use their own algorithm? How to do it? "own algorithm" only refers to deciding between a list of partitioning strategies (namely hash and range partitioning) if the connector offers more than one. Regards, Timo On 30.10.23 12:39, Jing Ge wrote: Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions: 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther wrote: Let me reply to the feedback from Yunfan: > Distribute by in DML is also supported by Hive I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution. How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins. > to avoid data skew problem I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax. [1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf Let me reply to the feedback from Benchao: > Do you think it's useful to add some extensibility for the hash strategy The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector. Regards, Timo On 30.10.23 10:44, Timo Walther wrote: Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users. Let me know what you think. Regards, Timo On 27.10.23 10:20, Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Timo, Thank you for the update. The FLIP looks good to me now. I only have one more question. What does Flink check and throw exceptions for the bucketing? For example, do we check interfaces when executing create/alter DDL and when used as a source? Best, Jark On Tue, 31 Oct 2023 at 00:25, Timo Walther wrote: > Hi Jing, > > > Have you considered using BUCKET BY directly? > > Which vendor uses this syntax? Most vendors that I checked call this > concept "distribution". > > In any case, the "BY" is optional, so certain DDL statements would > declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, > we should use the passive voice. > > > Did you mean users can use their own algorithm? How to do it? > > "own algorithm" only refers to deciding between a list of partitioning > strategies (namely hash and range partitioning) if the connector offers > more than one. > > Regards, > Timo > > > On 30.10.23 12:39, Jing Ge wrote: > > Hi Timo, > > > > The FLIP looks great! Thanks for bringing it to our attention! In order > to > > make sure we are on the same page, I would ask some questions: > > > > 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao > mentioned > > which is used to distribute rows amond reducers, i.e. focusing on the > > shuffle during the computation. The FLIP is focusing more on storage, if > I > > am not mistaken. Have you considered using BUCKET BY directly? > > > > 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name > STRING) > > DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS > > > > - For advanced users, the algorithm can be defined explicitly. > > - Currently, either HASH() or RANGE(). > > > > " > > Did you mean users can use their own algorithm? How to do it? > > > > Best regards, > > Jing > > > > On Mon, Oct 30, 2023 at 11:13 AM Timo Walther > wrote: > > > >> Let me reply to the feedback from Yunfan: > >> > >> > Distribute by in DML is also supported by Hive > >> > >> I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > >> discussion is about DDL. For DDL, we have more freedom as every vendor > >> has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > >> connector to the connector implementation, not the engine. However, for > >> DML we need to watch out for standard compliance and introduce changes > >> with high caution. > >> > >> How a LookupTableSource interprets the DISTRIBUTED BY is > >> connector-dependent in my opinion. In general this FLIP is a sink > >> ability, but we could have a follow FLIP that helps in distributing load > >> of lookup joins. > >> > >> > to avoid data skew problem > >> > >> I understand the use case and that it is important to solve it > >> eventually. Maybe a solution might be to introduce helper Polymorphic > >> Table Functions [1] in the future instead of new syntax. > >> > >> [1] > >> > >> > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > >> > >> > >> Let me reply to the feedback from Benchao: > >> > >> > Do you think it's useful to add some extensibility for the hash > >> strategy > >> > >> The hash strategy is fully determined by the connector, not the Flink > >> SQL engine. We are not using Flink's hash strategy in any way. If the > >> hash strategy for the regular Flink file system connector should be > >> changed, this should be expressed via config option. Otherwise we should > >> offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > >> > >> Regards, > >> Timo > >> > >> > >> On 30.10.23 10:44, Timo Walther wrote: > >>> Hi Jark, > >>> > >>> my intention was to avoid too complex syntax in the first version. In > >>> the past years, we could enable use cases also without this clause, so > >>> we should be careful with overloading it with too functionality in the > >>> first version. We can still iterate on it later, the interfaces are > >>> flexible enough to support more in the future. > >>> > >>> I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > >>> the bucket number optional. > >>> > >>> I updated the FLIP accordingly. Now the SupportsBucketing interface > >>> declares more methods that help in validation and proving helpful error > >>> messages to users. > >>> > >>> Let me know what you think. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 27.10.23 10:20, Jark Wu wrote: > Hi Timo, > > Thanks for starting this discussion. I really like it! > The FLIP is already in good shape, I only have some minor comments. > > 1. Could we also support HASH and RANGE distribution kind on the DDL > syntax? > I noticed that HASH and UNKNOWN are introduced in the Java API, but > not in > the syntax. > > 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > >> TABLE? > Some storage engines support automatically determining the bucket > number > based on > the cluster resources an
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Jing, > Have you considered using BUCKET BY directly? Which vendor uses this syntax? Most vendors that I checked call this concept "distribution". In any case, the "BY" is optional, so certain DDL statements would declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, we should use the passive voice. > Did you mean users can use their own algorithm? How to do it? "own algorithm" only refers to deciding between a list of partitioning strategies (namely hash and range partitioning) if the connector offers more than one. Regards, Timo On 30.10.23 12:39, Jing Ge wrote: Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions: 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther wrote: Let me reply to the feedback from Yunfan: > Distribute by in DML is also supported by Hive I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution. How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins. > to avoid data skew problem I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax. [1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf Let me reply to the feedback from Benchao: > Do you think it's useful to add some extensibility for the hash strategy The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector. Regards, Timo On 30.10.23 10:44, Timo Walther wrote: Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users. Let me know what you think. Regards, Timo On 27.10.23 10:20, Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API, but not in the syntax. 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? Some storage engines support automatically determining the bucket number based on the cluster resources and data size of the table. For example, StarRocks[1] and Paimon[2]. Best, Jark [1]: https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets [2]: https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: Very thanks Timo for starting this discussion. Big +1 for this. The design looks good to me! We can add some documentation for connector developers. For example: for sink, If there needs some keyby, please finish the keyby by the connector itself. SupportsBucketing is just a marker interface. Best, Jingsong On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: Hi everyone, I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY clause [1]. Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering. Thi
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Timo, The FLIP looks great! Thanks for bringing it to our attention! In order to make sure we are on the same page, I would ask some questions: 1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned which is used to distribute rows amond reducers, i.e. focusing on the shuffle during the computation. The FLIP is focusing more on storage, if I am not mistaken. Have you considered using BUCKET BY directly? 2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS - For advanced users, the algorithm can be defined explicitly. - Currently, either HASH() or RANGE(). " Did you mean users can use their own algorithm? How to do it? Best regards, Jing On Mon, Oct 30, 2023 at 11:13 AM Timo Walther wrote: > Let me reply to the feedback from Yunfan: > > > Distribute by in DML is also supported by Hive > > I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This > discussion is about DDL. For DDL, we have more freedom as every vendor > has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly > connector to the connector implementation, not the engine. However, for > DML we need to watch out for standard compliance and introduce changes > with high caution. > > How a LookupTableSource interprets the DISTRIBUTED BY is > connector-dependent in my opinion. In general this FLIP is a sink > ability, but we could have a follow FLIP that helps in distributing load > of lookup joins. > > > to avoid data skew problem > > I understand the use case and that it is important to solve it > eventually. Maybe a solution might be to introduce helper Polymorphic > Table Functions [1] in the future instead of new syntax. > > [1] > > https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf > > > Let me reply to the feedback from Benchao: > > > Do you think it's useful to add some extensibility for the hash > strategy > > The hash strategy is fully determined by the connector, not the Flink > SQL engine. We are not using Flink's hash strategy in any way. If the > hash strategy for the regular Flink file system connector should be > changed, this should be expressed via config option. Otherwise we should > offer a dedicated `hive-filesystem` or `spark-filesystem` connector. > > Regards, > Timo > > > On 30.10.23 10:44, Timo Walther wrote: > > Hi Jark, > > > > my intention was to avoid too complex syntax in the first version. In > > the past years, we could enable use cases also without this clause, so > > we should be careful with overloading it with too functionality in the > > first version. We can still iterate on it later, the interfaces are > > flexible enough to support more in the future. > > > > I agree that maybe an explicit HASH and RANGE doesn't harm. Also making > > the bucket number optional. > > > > I updated the FLIP accordingly. Now the SupportsBucketing interface > > declares more methods that help in validation and proving helpful error > > messages to users. > > > > Let me know what you think. > > > > Regards, > > Timo > > > > > > On 27.10.23 10:20, Jark Wu wrote: > >> Hi Timo, > >> > >> Thanks for starting this discussion. I really like it! > >> The FLIP is already in good shape, I only have some minor comments. > >> > >> 1. Could we also support HASH and RANGE distribution kind on the DDL > >> syntax? > >> I noticed that HASH and UNKNOWN are introduced in the Java API, but > >> not in > >> the syntax. > >> > >> 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER > TABLE? > >> Some storage engines support automatically determining the bucket number > >> based on > >> the cluster resources and data size of the table. For example, > >> StarRocks[1] > >> and Paimon[2]. > >> > >> Best, > >> Jark > >> > >> [1]: > >> > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > >> [2]: > >> > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > >> > >> On Thu, 26 Oct 2023 at 18:26, Jingsong Li > wrote: > >> > >>> Very thanks Timo for starting this discussion. > >>> > >>> Big +1 for this. > >>> > >>> The design looks good to me! > >>> > >>> We can add some documentation for connector developers. For example: > >>> for sink, If there needs some keyby, please finish the keyby by the > >>> connector itself. SupportsBucketing is just a marker interface. > >>> > >>> Best, > >>> Jingsong > >>> > >>> On Thu, Oct 26, 2023 at 5:00 PM Timo Walther > wrote: > > Hi everyone, > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > clause [1]. > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and > Clustering. This FLIP continues the work of previous FLIPs and would > like to introduce the concept of "Bucketing" to Flink. > > This is a pure connector characteristic and helps both Apache Kafka > and
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Let me reply to the feedback from Yunfan: > Distribute by in DML is also supported by Hive I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This discussion is about DDL. For DDL, we have more freedom as every vendor has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly connector to the connector implementation, not the engine. However, for DML we need to watch out for standard compliance and introduce changes with high caution. How a LookupTableSource interprets the DISTRIBUTED BY is connector-dependent in my opinion. In general this FLIP is a sink ability, but we could have a follow FLIP that helps in distributing load of lookup joins. > to avoid data skew problem I understand the use case and that it is important to solve it eventually. Maybe a solution might be to introduce helper Polymorphic Table Functions [1] in the future instead of new syntax. [1] https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf Let me reply to the feedback from Benchao: > Do you think it's useful to add some extensibility for the hash strategy The hash strategy is fully determined by the connector, not the Flink SQL engine. We are not using Flink's hash strategy in any way. If the hash strategy for the regular Flink file system connector should be changed, this should be expressed via config option. Otherwise we should offer a dedicated `hive-filesystem` or `spark-filesystem` connector. Regards, Timo On 30.10.23 10:44, Timo Walther wrote: Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users. Let me know what you think. Regards, Timo On 27.10.23 10:20, Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API, but not in the syntax. 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? Some storage engines support automatically determining the bucket number based on the cluster resources and data size of the table. For example, StarRocks[1] and Paimon[2]. Best, Jark [1]: https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets [2]: https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: Very thanks Timo for starting this discussion. Big +1 for this. The design looks good to me! We can add some documentation for connector developers. For example: for sink, If there needs some keyby, please finish the keyby by the connector itself. SupportsBucketing is just a marker interface. Best, Jingsong On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: Hi everyone, I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY clause [1]. Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering. This FLIP continues the work of previous FLIPs and would like to introduce the concept of "Bucketing" to Flink. This is a pure connector characteristic and helps both Apache Kafka and Apache Paimon connectors in avoiding a complex WITH clause by providing improved syntax. Here is an example: CREATE TABLE MyTable ( uid BIGINT, name STRING ) DISTRIBUTED BY (uid) INTO 6 BUCKETS WITH ( 'connector' = 'kafka' ) The full syntax specification can be found in the document. The clause should be optional and fully backwards compatible. Regards, Timo [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Yunfan and Benchao, it seems the FLIP discussion thread got split into two parts. At least this is what I see in my mail program. I would kindly ask to answer in the other thread [1]. I will also reply there now to maintain the discussion link. Regards, Timo [1] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1 On 28.10.23 10:34, Benchao Li wrote: Thanks Timo for preparing the FLIP. Regarding "By default, DISTRIBUTED BY assumes a list of columns for an implicit hash partitioning." Do you think it's useful to add some extensibility for the hash strategy. One scenario I can foresee is if we write bucketed data into Hive, and if Flink's hash strategy is different than Hive/Spark's, then they could not utilize the bucketed data written by Flink. This is the one case I met in production already, there may be more cases like this that needs customize the hash strategy to accommodate with existing systems. yunfan zhang 于2023年10月27日周五 19:06写道: Distribute by in DML is also supported by Hive. And it is also useful for flink. Users can use this ability to increase cache hit rate in lookup join. And users can use "distribute by key, rand(1, 10)” to avoid data skew problem. And I think it is another way to solve this Flip204[1] There is already has some people required this feature[2] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join [2] https://issues.apache.org/jira/browse/FLINK-27541 On 2023/10/27 08:20:25 Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API, but not in the syntax. 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? Some storage engines support automatically determining the bucket number based on the cluster resources and data size of the table. For example, StarRocks[1] and Paimon[2]. Best, Jark [1]: https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets [2]: https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: Very thanks Timo for starting this discussion. Big +1 for this. The design looks good to me! We can add some documentation for connector developers. For example: for sink, If there needs some keyby, please finish the keyby by the connector itself. SupportsBucketing is just a marker interface. Best, Jingsong On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: Hi everyone, I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY clause [1]. Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering. This FLIP continues the work of previous FLIPs and would like to introduce the concept of "Bucketing" to Flink. This is a pure connector characteristic and helps both Apache Kafka and Apache Paimon connectors in avoiding a complex WITH clause by providing improved syntax. Here is an example: CREATE TABLE MyTable ( uid BIGINT, name STRING ) DISTRIBUTED BY (uid) INTO 6 BUCKETS WITH ( 'connector' = 'kafka' ) The full syntax specification can be found in the document. The clause should be optional and fully backwards compatible. Regards, Timo [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Jark, my intention was to avoid too complex syntax in the first version. In the past years, we could enable use cases also without this clause, so we should be careful with overloading it with too functionality in the first version. We can still iterate on it later, the interfaces are flexible enough to support more in the future. I agree that maybe an explicit HASH and RANGE doesn't harm. Also making the bucket number optional. I updated the FLIP accordingly. Now the SupportsBucketing interface declares more methods that help in validation and proving helpful error messages to users. Let me know what you think. Regards, Timo On 27.10.23 10:20, Jark Wu wrote: Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API, but not in the syntax. 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? Some storage engines support automatically determining the bucket number based on the cluster resources and data size of the table. For example, StarRocks[1] and Paimon[2]. Best, Jark [1]: https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets [2]: https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: Very thanks Timo for starting this discussion. Big +1 for this. The design looks good to me! We can add some documentation for connector developers. For example: for sink, If there needs some keyby, please finish the keyby by the connector itself. SupportsBucketing is just a marker interface. Best, Jingsong On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: Hi everyone, I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY clause [1]. Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering. This FLIP continues the work of previous FLIPs and would like to introduce the concept of "Bucketing" to Flink. This is a pure connector characteristic and helps both Apache Kafka and Apache Paimon connectors in avoiding a complex WITH clause by providing improved syntax. Here is an example: CREATE TABLE MyTable ( uid BIGINT, name STRING ) DISTRIBUTED BY (uid) INTO 6 BUCKETS WITH ( 'connector' = 'kafka' ) The full syntax specification can be found in the document. The clause should be optional and fully backwards compatible. Regards, Timo [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
Re: Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Thanks Timo for preparing the FLIP. Regarding "By default, DISTRIBUTED BY assumes a list of columns for an implicit hash partitioning." Do you think it's useful to add some extensibility for the hash strategy. One scenario I can foresee is if we write bucketed data into Hive, and if Flink's hash strategy is different than Hive/Spark's, then they could not utilize the bucketed data written by Flink. This is the one case I met in production already, there may be more cases like this that needs customize the hash strategy to accommodate with existing systems. yunfan zhang 于2023年10月27日周五 19:06写道: > > Distribute by in DML is also supported by Hive. > And it is also useful for flink. > Users can use this ability to increase cache hit rate in lookup join. > And users can use "distribute by key, rand(1, 10)” to avoid data skew problem. > And I think it is another way to solve this Flip204[1] > There is already has some people required this feature[2] > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > [2] https://issues.apache.org/jira/browse/FLINK-27541 > > On 2023/10/27 08:20:25 Jark Wu wrote: > > Hi Timo, > > > > Thanks for starting this discussion. I really like it! > > The FLIP is already in good shape, I only have some minor comments. > > > > 1. Could we also support HASH and RANGE distribution kind on the DDL > > syntax? > > I noticed that HASH and UNKNOWN are introduced in the Java API, but not in > > the syntax. > > > > 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? > > Some storage engines support automatically determining the bucket number > > based on > > the cluster resources and data size of the table. For example, StarRocks[1] > > and Paimon[2]. > > > > Best, > > Jark > > > > [1]: > > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > > [2]: > > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > > > > On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: > > > > > Very thanks Timo for starting this discussion. > > > > > > Big +1 for this. > > > > > > The design looks good to me! > > > > > > We can add some documentation for connector developers. For example: > > > for sink, If there needs some keyby, please finish the keyby by the > > > connector itself. SupportsBucketing is just a marker interface. > > > > > > Best, > > > Jingsong > > > > > > On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: > > > > > > > > Hi everyone, > > > > > > > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > > > > clause [1]. > > > > > > > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and > > > > Clustering. This FLIP continues the work of previous FLIPs and would > > > > like to introduce the concept of "Bucketing" to Flink. > > > > > > > > This is a pure connector characteristic and helps both Apache Kafka and > > > > Apache Paimon connectors in avoiding a complex WITH clause by providing > > > > improved syntax. > > > > > > > > Here is an example: > > > > > > > > CREATE TABLE MyTable > > > >( > > > > uid BIGINT, > > > > name STRING > > > >) > > > >DISTRIBUTED BY (uid) INTO 6 BUCKETS > > > >WITH ( > > > > 'connector' = 'kafka' > > > >) > > > > > > > > The full syntax specification can be found in the document. The clause > > > > should be optional and fully backwards compatible. > > > > > > > > Regards, > > > > Timo > > > > > > > > [1] > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > > -- Best, Benchao Li
RE: Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Distribute by in DML is also supported by Hive. And it is also useful for flink. Users can use this ability to increase cache hit rate in lookup join. And users can use "distribute by key, rand(1, 10)” to avoid data skew problem. And I think it is another way to solve this Flip204[1] There is already has some people required this feature[2] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join [2] https://issues.apache.org/jira/browse/FLINK-27541 On 2023/10/27 08:20:25 Jark Wu wrote: > Hi Timo, > > Thanks for starting this discussion. I really like it! > The FLIP is already in good shape, I only have some minor comments. > > 1. Could we also support HASH and RANGE distribution kind on the DDL > syntax? > I noticed that HASH and UNKNOWN are introduced in the Java API, but not in > the syntax. > > 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? > Some storage engines support automatically determining the bucket number > based on > the cluster resources and data size of the table. For example, StarRocks[1] > and Paimon[2]. > > Best, > Jark > > [1]: > https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets > [2]: > https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket > > On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: > > > Very thanks Timo for starting this discussion. > > > > Big +1 for this. > > > > The design looks good to me! > > > > We can add some documentation for connector developers. For example: > > for sink, If there needs some keyby, please finish the keyby by the > > connector itself. SupportsBucketing is just a marker interface. > > > > Best, > > Jingsong > > > > On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: > > > > > > Hi everyone, > > > > > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > > > clause [1]. > > > > > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and > > > Clustering. This FLIP continues the work of previous FLIPs and would > > > like to introduce the concept of "Bucketing" to Flink. > > > > > > This is a pure connector characteristic and helps both Apache Kafka and > > > Apache Paimon connectors in avoiding a complex WITH clause by providing > > > improved syntax. > > > > > > Here is an example: > > > > > > CREATE TABLE MyTable > > >( > > > uid BIGINT, > > > name STRING > > >) > > >DISTRIBUTED BY (uid) INTO 6 BUCKETS > > >WITH ( > > > 'connector' = 'kafka' > > >) > > > > > > The full syntax specification can be found in the document. The clause > > > should be optional and fully backwards compatible. > > > > > > Regards, > > > Timo > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > >
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Hi Timo, Thanks for starting this discussion. I really like it! The FLIP is already in good shape, I only have some minor comments. 1. Could we also support HASH and RANGE distribution kind on the DDL syntax? I noticed that HASH and UNKNOWN are introduced in the Java API, but not in the syntax. 2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE? Some storage engines support automatically determining the bucket number based on the cluster resources and data size of the table. For example, StarRocks[1] and Paimon[2]. Best, Jark [1]: https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets [2]: https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket On Thu, 26 Oct 2023 at 18:26, Jingsong Li wrote: > Very thanks Timo for starting this discussion. > > Big +1 for this. > > The design looks good to me! > > We can add some documentation for connector developers. For example: > for sink, If there needs some keyby, please finish the keyby by the > connector itself. SupportsBucketing is just a marker interface. > > Best, > Jingsong > > On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: > > > > Hi everyone, > > > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > > clause [1]. > > > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and > > Clustering. This FLIP continues the work of previous FLIPs and would > > like to introduce the concept of "Bucketing" to Flink. > > > > This is a pure connector characteristic and helps both Apache Kafka and > > Apache Paimon connectors in avoiding a complex WITH clause by providing > > improved syntax. > > > > Here is an example: > > > > CREATE TABLE MyTable > >( > > uid BIGINT, > > name STRING > >) > >DISTRIBUTED BY (uid) INTO 6 BUCKETS > >WITH ( > > 'connector' = 'kafka' > >) > > > > The full syntax specification can be found in the document. The clause > > should be optional and fully backwards compatible. > > > > Regards, > > Timo > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause >
Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause
Very thanks Timo for starting this discussion. Big +1 for this. The design looks good to me! We can add some documentation for connector developers. For example: for sink, If there needs some keyby, please finish the keyby by the connector itself. SupportsBucketing is just a marker interface. Best, Jingsong On Thu, Oct 26, 2023 at 5:00 PM Timo Walther wrote: > > Hi everyone, > > I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY > clause [1]. > > Many SQL vendors expose the concepts of Partitioning, Bucketing, and > Clustering. This FLIP continues the work of previous FLIPs and would > like to introduce the concept of "Bucketing" to Flink. > > This is a pure connector characteristic and helps both Apache Kafka and > Apache Paimon connectors in avoiding a complex WITH clause by providing > improved syntax. > > Here is an example: > > CREATE TABLE MyTable >( > uid BIGINT, > name STRING >) >DISTRIBUTED BY (uid) INTO 6 BUCKETS >WITH ( > 'connector' = 'kafka' >) > > The full syntax specification can be found in the document. The clause > should be optional and fully backwards compatible. > > Regards, > Timo > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause