Thanks Timo for working on FLIP-107. Agree, I think it is good. I'll spend more time to form a FLIP in detail later.
Best, Jingsong On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <twal...@apache.org> wrote: > Hi Jingsong, > > I haven't looked at your proposal but I think it make sense to have a > separate FLIP for the parititioning topic. I'm currently working on an > update to FLIP-107 and would suggest to remove the paritioning topic > there. FLIP-107 will only focus on accessing metadata and expressing > key/value formats. > > What do you think? > > Regards, > Timo > > On 01.09.20 07:39, Jingsong Li wrote: > > Thanks Konstantin and Benchao for your response. > > > > If we need to push forward the implementation, it should be a FLIP. > > > > My original intention was to unify the partition definitions for batches > > and streams: > > > > - What is "PARTITION" on a table? Partitions define the physical storage > > form of a table. Different partitions should be stored in different > places. > > There should be good isolation between them. Therefore, in the connector > > dimension, we can operate on a partition separately. > > - For the Kafka table, the partition is a finite integer value, depending > > on how many partitions Kafka has. > > - For the Filesystem table, the partition is a directory structure, which > > may be a multi-level structure. And it can be any type, because any type > > can be converted to a character string. > > > > So, how do we generate a partition value? It can be directly mapped to a > > field (identity). In addition, the partition value can also be generated > by > > a function (Transform), this is what I want to discuss with you. > > > > ## To Konstantin: > > > >> 1) Does the PARTITION BY clause only have an effect for sink tables > > defining how data should be partitioning the sink system or does it also > > make a difference for source tables? My understanding is that it also > makes > > a difference for source tables (e.g. if the source system supports > > partition pruning). I suppose, for source tables Flink does not > > check/enforce this, but trusts that the partitioning information is > > correct?! > > > > Yes, also works for source, according to my understanding, partition > > pruning is actually a kind of filtering push-down. The source is > > responsible for reading data that contains some specific partitions. > > > >> 2) I suppose it is up to the connector implementation whether/how to > > interpret the partition information. How will this work? > > > > I think it depends on the interface and implementation. For example, for > > Kafka, for example, partition fields can be defined on DDL, and some > > filtering conditions can be done according to partition fields in query. > > Kafkasource can operate according to these filter conditions. > > > >> 3) For Kafka, I suppose, the most common partitioning strategy is by > key. > > FLIP-107 contains a proposal on how to define the key (which fields of > the > > schema should become part of the key) when writing to Kafka via Flink > SQL. > > How does this relate to the PARTITION BY clause? > > > > How to define a partition should have nothing to do with whether the > field > > it refers to is a key. Users can refer to the fields in the schema. > > > > ## To Benchao, > > > > I feel that your words should be talking about ShuffleBy in the > calculation > > level? That's a good topic, but this discussion is mainly about > partitions > > in connector storage. > > > > Best, > > Jingsong > > > > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <libenc...@apache.org> wrote: > > > >> Hi Jingsong, > >> > >> Thanks for bringing up this discussion. I like this idea generally. > >> I'd like to add some cases we met in our scenarios. > >> > >> ## Source Partition By > >> There is an use case that users want to do some lookup thing in the UDF, > >> it's very like the dimension table. It's common for them to cache some > >> lookup result > >> in their UDF. If there is no 'partition by' from the source, they maybe > >> need to cache > >> more data in each subtask. > >> Actually, what they need is `keyBy` from the DataStream world. > >> We supported this feature already in our production. > >> > >> ## Sink Partition By > >> Recently we also received some requirements about this feature. > >> Users want to add their custom sink, and need the data shuffled before > the > >> sink. > >> They will do some thing for the data of the same partition key. > >> > >> An addition to 'Source Partition By' semantic, actually, it's not enough > >> for current use cases. > >> The more common way to do this is to add partition by semantic in > 'view', > >> then > >> users can do the 'keyBy' multiple times in one query. > >> > >> I've no strong options about these features, just add some use cases and > >> would like to hear more options about this. > >> > >> > >> Konstantin Knauf <kna...@apache.org> 于2020年8月31日周一 下午7:09写道: > >> > >>> Hi Jingsong, > >>> > >>> I would like to understand this FLIP (?) a bit better, but I am missing > >>> some background, I believe. So, some basic questions: > >>> > >>> 1) Does the PARTITION BY clause only have an effect for sink tables > >>> defining how data should be partitioning the sink system or does it > also > >>> make a difference for source tables? My understanding is that it also > >> makes > >>> a difference for source tables (e.g. if the source system > >>> supports partition pruning). I suppose, for source tables Flink does > not > >>> check/enforce this, but trusts that the partitioning information is > >>> correct?! > >>> > >>> 2) I suppose it is up to the connector implementation whether/how to > >>> interpret the partition information. How will this work? > >>> > >>> 3) For Kafka, I suppose, the most common partitioning strategy is by > key. > >>> FLIP-107 contains a proposal on how to define the key (which fields of > >> the > >>> schema should become part of the key) when writing to Kafka via Flink > >> SQL. > >>> How does this relate to the PARTITION BY clause? > >>> > >>> Thanks, > >>> > >>> Konstantin > >>> > >>> > >>> > >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com> > >>> wrote: > >>> > >>>> Hi all, > >>>> > >>>> ## Motivation > >>>> > >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an > >>>> extent that let us support Hive's partitioning. > >>>> But this partition definition is completely specific to Hive/File > >>>> systems, with the continuous development of the system, there are new > >>>> requirements: > >>>> > >>>> - FLIP-107 [2] requirements: A common requirement is to create a > custom > >>>> partitioning of the data. We should have a way to specify/compute > >> target > >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be > >>> the > >>>> only way to control partitioning. > >>>> > >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces > >>> partition > >>>> values by taking a column value and optionally transforming it. > Iceberg > >>> is > >>>> responsible for converting event_time into event_date, and keeps track > >> of > >>>> the relationship. > >>>> > >>>> So I think it is better to introduce partitioning strategies to Flink, > >>>> the partitioning strategies is similar to partitioning in traditional > >>>> database like Oracle [4]. > >>>> > >>>> ## Proposed Partitioning DDL > >>>> > >>>> Hash Partitioning Tables: > >>>> > >>>> CREATE TABLE kafka_table ( > >>>> id STRING, > >>>> name STRING, > >>>> date: DATE ... ) > >>>> PARTITIONED BY (HASH(id, name)) > >>>> > >>>> Explicit Partitioning Tables (Introduced in FLIP-63): > >>>> > >>>> CREATE TABLE fs_table ( > >>>> name STRING, > >>>> date: DATE ... ) > >>>> PARTITIONED BY (date) > >>>> > >>>> (Can we remove the brackets when there is only a single layer > >> partition? > >>> => > >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" ) > >>>> > >>>> Composite Partitioning Tables: > >>>> > >>>> CREATE TABLE fs_table ( > >>>> name STRING, > >>>> date: Date > >>>> ... ) > >>>> PARTITIONED BY (year(date), month(date), day(date)) > >>>> > >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63): > >>>> > >>>> CREATE TABLE fs_table ( > >>>> name STRING, > >>>> date: Date, > >>>> y: STRING,' > >>>> m: STRING, > >>>> d: STRING, > >>>> ... ) > >>>> PARTITIONED BY (y, m, d) > >>>> > >>>> ## Rejected Alternatives > >>>> > >>>> Composite Partitioning Tables DDL like Oracle: > >>>> > >>>> CREATE TABLE invoices ( > >>>> invoice_no NUMBER NOT NULL, > >>>> invoice_date DATE NOT NULL, > >>>> comments VARCHAR2(500)) > >>>> PARTITION BY RANGE (invoice_date) > >>>> SUBPARTITION BY HASH (invoice_no) > >>>> SUBPARTITIONS 8 ( > >>>> PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001', > >>>> 'DD/MM/YYYY')), > >>>> PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001', > >>>> 'DD/MM/YYYY')), > >>>> PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001', > >>>> 'DD/MM/YYYY')), > >>>> PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002', > >>>> 'DD/MM/YYYY')); > >>>> > >>>> - First, Multi level partitioning is a common thing in big data > >> systems. > >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex, but > >>> also > >>>> completely different from big data systems such as hive. Big data > >> systems > >>>> need to specify less partition information than traditional ones, so > it > >>> is > >>>> more natural to write all partitions in one bracket. > >>>> > >>>> ## Other Interface changes > >>>> > >>>> It can be imagined that this change will involve many Catalog / Table > >>>> related interfaces, and it is necessary to replace the previous > >>>> `List<String> partitionKeys` with `partitioning strategies`. > >>>> > >>>> What do you think? > >>>> > >>>> [1] > >>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support > >>>> [2] > >>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records > >>>> [3]http://iceberg.apache.org/partitioning/ > >>>> [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes > >>>> > >>>> Best, > >>>> Jingsong > >>>> > >>> > >>> > >>> -- > >>> > >>> Konstantin Knauf > >>> > >>> https://twitter.com/snntrable > >>> > >>> https://github.com/knaufk > >>> > >> > >> > >> -- > >> > >> Best, > >> Benchao Li > >> > > > > > > -- Best, Jingsong Lee