Hi Jingsong, Thanks for the clarification, and sorry to misunderstand your first intention. What I was talking about is indeed another topic, we can leave it to the future, and see if there are any other people who have the same scenarios.
Jingsong Li <jingsongl...@gmail.com> 于2020年9月3日周四 上午10:56写道: > Thanks Timo for working on FLIP-107. > > Agree, I think it is good. > I'll spend more time to form a FLIP in detail later. > > Best, > Jingsong > > On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <twal...@apache.org> wrote: > > > Hi Jingsong, > > > > I haven't looked at your proposal but I think it make sense to have a > > separate FLIP for the parititioning topic. I'm currently working on an > > update to FLIP-107 and would suggest to remove the paritioning topic > > there. FLIP-107 will only focus on accessing metadata and expressing > > key/value formats. > > > > What do you think? > > > > Regards, > > Timo > > > > On 01.09.20 07:39, Jingsong Li wrote: > > > Thanks Konstantin and Benchao for your response. > > > > > > If we need to push forward the implementation, it should be a FLIP. > > > > > > My original intention was to unify the partition definitions for > batches > > > and streams: > > > > > > - What is "PARTITION" on a table? Partitions define the physical > storage > > > form of a table. Different partitions should be stored in different > > places. > > > There should be good isolation between them. Therefore, in the > connector > > > dimension, we can operate on a partition separately. > > > - For the Kafka table, the partition is a finite integer value, > depending > > > on how many partitions Kafka has. > > > - For the Filesystem table, the partition is a directory structure, > which > > > may be a multi-level structure. And it can be any type, because any > type > > > can be converted to a character string. > > > > > > So, how do we generate a partition value? It can be directly mapped to > a > > > field (identity). In addition, the partition value can also be > generated > > by > > > a function (Transform), this is what I want to discuss with you. > > > > > > ## To Konstantin: > > > > > >> 1) Does the PARTITION BY clause only have an effect for sink tables > > > defining how data should be partitioning the sink system or does it > also > > > make a difference for source tables? My understanding is that it also > > makes > > > a difference for source tables (e.g. if the source system supports > > > partition pruning). I suppose, for source tables Flink does not > > > check/enforce this, but trusts that the partitioning information is > > > correct?! > > > > > > Yes, also works for source, according to my understanding, partition > > > pruning is actually a kind of filtering push-down. The source is > > > responsible for reading data that contains some specific partitions. > > > > > >> 2) I suppose it is up to the connector implementation whether/how to > > > interpret the partition information. How will this work? > > > > > > I think it depends on the interface and implementation. For example, > for > > > Kafka, for example, partition fields can be defined on DDL, and some > > > filtering conditions can be done according to partition fields in > query. > > > Kafkasource can operate according to these filter conditions. > > > > > >> 3) For Kafka, I suppose, the most common partitioning strategy is by > > key. > > > FLIP-107 contains a proposal on how to define the key (which fields of > > the > > > schema should become part of the key) when writing to Kafka via Flink > > SQL. > > > How does this relate to the PARTITION BY clause? > > > > > > How to define a partition should have nothing to do with whether the > > field > > > it refers to is a key. Users can refer to the fields in the schema. > > > > > > ## To Benchao, > > > > > > I feel that your words should be talking about ShuffleBy in the > > calculation > > > level? That's a good topic, but this discussion is mainly about > > partitions > > > in connector storage. > > > > > > Best, > > > Jingsong > > > > > > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <libenc...@apache.org> > wrote: > > > > > >> Hi Jingsong, > > >> > > >> Thanks for bringing up this discussion. I like this idea generally. > > >> I'd like to add some cases we met in our scenarios. > > >> > > >> ## Source Partition By > > >> There is an use case that users want to do some lookup thing in the > UDF, > > >> it's very like the dimension table. It's common for them to cache some > > >> lookup result > > >> in their UDF. If there is no 'partition by' from the source, they > maybe > > >> need to cache > > >> more data in each subtask. > > >> Actually, what they need is `keyBy` from the DataStream world. > > >> We supported this feature already in our production. > > >> > > >> ## Sink Partition By > > >> Recently we also received some requirements about this feature. > > >> Users want to add their custom sink, and need the data shuffled before > > the > > >> sink. > > >> They will do some thing for the data of the same partition key. > > >> > > >> An addition to 'Source Partition By' semantic, actually, it's not > enough > > >> for current use cases. > > >> The more common way to do this is to add partition by semantic in > > 'view', > > >> then > > >> users can do the 'keyBy' multiple times in one query. > > >> > > >> I've no strong options about these features, just add some use cases > and > > >> would like to hear more options about this. > > >> > > >> > > >> Konstantin Knauf <kna...@apache.org> 于2020年8月31日周一 下午7:09写道: > > >> > > >>> Hi Jingsong, > > >>> > > >>> I would like to understand this FLIP (?) a bit better, but I am > missing > > >>> some background, I believe. So, some basic questions: > > >>> > > >>> 1) Does the PARTITION BY clause only have an effect for sink tables > > >>> defining how data should be partitioning the sink system or does it > > also > > >>> make a difference for source tables? My understanding is that it also > > >> makes > > >>> a difference for source tables (e.g. if the source system > > >>> supports partition pruning). I suppose, for source tables Flink does > > not > > >>> check/enforce this, but trusts that the partitioning information is > > >>> correct?! > > >>> > > >>> 2) I suppose it is up to the connector implementation whether/how to > > >>> interpret the partition information. How will this work? > > >>> > > >>> 3) For Kafka, I suppose, the most common partitioning strategy is by > > key. > > >>> FLIP-107 contains a proposal on how to define the key (which fields > of > > >> the > > >>> schema should become part of the key) when writing to Kafka via Flink > > >> SQL. > > >>> How does this relate to the PARTITION BY clause? > > >>> > > >>> Thanks, > > >>> > > >>> Konstantin > > >>> > > >>> > > >>> > > >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com > > > > >>> wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> ## Motivation > > >>>> > > >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to > an > > >>>> extent that let us support Hive's partitioning. > > >>>> But this partition definition is completely specific to Hive/File > > >>>> systems, with the continuous development of the system, there are > new > > >>>> requirements: > > >>>> > > >>>> - FLIP-107 [2] requirements: A common requirement is to create a > > custom > > >>>> partitioning of the data. We should have a way to specify/compute > > >> target > > >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would > be > > >>> the > > >>>> only way to control partitioning. > > >>>> > > >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces > > >>> partition > > >>>> values by taking a column value and optionally transforming it. > > Iceberg > > >>> is > > >>>> responsible for converting event_time into event_date, and keeps > track > > >> of > > >>>> the relationship. > > >>>> > > >>>> So I think it is better to introduce partitioning strategies to > Flink, > > >>>> the partitioning strategies is similar to partitioning in > traditional > > >>>> database like Oracle [4]. > > >>>> > > >>>> ## Proposed Partitioning DDL > > >>>> > > >>>> Hash Partitioning Tables: > > >>>> > > >>>> CREATE TABLE kafka_table ( > > >>>> id STRING, > > >>>> name STRING, > > >>>> date: DATE ... ) > > >>>> PARTITIONED BY (HASH(id, name)) > > >>>> > > >>>> Explicit Partitioning Tables (Introduced in FLIP-63): > > >>>> > > >>>> CREATE TABLE fs_table ( > > >>>> name STRING, > > >>>> date: DATE ... ) > > >>>> PARTITIONED BY (date) > > >>>> > > >>>> (Can we remove the brackets when there is only a single layer > > >> partition? > > >>> => > > >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" ) > > >>>> > > >>>> Composite Partitioning Tables: > > >>>> > > >>>> CREATE TABLE fs_table ( > > >>>> name STRING, > > >>>> date: Date > > >>>> ... ) > > >>>> PARTITIONED BY (year(date), month(date), day(date)) > > >>>> > > >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63): > > >>>> > > >>>> CREATE TABLE fs_table ( > > >>>> name STRING, > > >>>> date: Date, > > >>>> y: STRING,' > > >>>> m: STRING, > > >>>> d: STRING, > > >>>> ... ) > > >>>> PARTITIONED BY (y, m, d) > > >>>> > > >>>> ## Rejected Alternatives > > >>>> > > >>>> Composite Partitioning Tables DDL like Oracle: > > >>>> > > >>>> CREATE TABLE invoices ( > > >>>> invoice_no NUMBER NOT NULL, > > >>>> invoice_date DATE NOT NULL, > > >>>> comments VARCHAR2(500)) > > >>>> PARTITION BY RANGE (invoice_date) > > >>>> SUBPARTITION BY HASH (invoice_no) > > >>>> SUBPARTITIONS 8 ( > > >>>> PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001', > > >>>> 'DD/MM/YYYY')), > > >>>> PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001', > > >>>> 'DD/MM/YYYY')), > > >>>> PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001', > > >>>> 'DD/MM/YYYY')), > > >>>> PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002', > > >>>> 'DD/MM/YYYY')); > > >>>> > > >>>> - First, Multi level partitioning is a common thing in big data > > >> systems. > > >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex, > but > > >>> also > > >>>> completely different from big data systems such as hive. Big data > > >> systems > > >>>> need to specify less partition information than traditional ones, so > > it > > >>> is > > >>>> more natural to write all partitions in one bracket. > > >>>> > > >>>> ## Other Interface changes > > >>>> > > >>>> It can be imagined that this change will involve many Catalog / > Table > > >>>> related interfaces, and it is necessary to replace the previous > > >>>> `List<String> partitionKeys` with `partitioning strategies`. > > >>>> > > >>>> What do you think? > > >>>> > > >>>> [1] > > >>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support > > >>>> [2] > > >>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records > > >>>> [3]http://iceberg.apache.org/partitioning/ > > >>>> [4] > https://oracle-base.com/articles/8i/partitioned-tables-and-indexes > > >>>> > > >>>> Best, > > >>>> Jingsong > > >>>> > > >>> > > >>> > > >>> -- > > >>> > > >>> Konstantin Knauf > > >>> > > >>> https://twitter.com/snntrable > > >>> > > >>> https://github.com/knaufk > > >>> > > >> > > >> > > >> -- > > >> > > >> Best, > > >> Benchao Li > > >> > > > > > > > > > > > > -- > Best, Jingsong Lee > -- Best, Benchao Li