Thanks Timo for working on FLIP-107.

Agree, I think it is good.
I'll spend more time to form a FLIP in detail later.

Best,
Jingsong

On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <twal...@apache.org> wrote:

> Hi Jingsong,
>
> I haven't looked at your proposal but I think it make sense to have a
> separate FLIP for the parititioning topic. I'm currently working on an
> update to FLIP-107 and would suggest to remove the paritioning topic
> there. FLIP-107 will only focus on accessing metadata and expressing
> key/value formats.
>
> What do you think?
>
> Regards,
> Timo
>
> On 01.09.20 07:39, Jingsong Li wrote:
> > Thanks Konstantin and Benchao for your response.
> >
> > If we need to push forward the implementation, it should be a FLIP.
> >
> > My original intention was to unify the partition definitions for batches
> > and streams:
> >
> > - What is "PARTITION" on a table? Partitions define the physical storage
> > form of a table. Different partitions should be stored in different
> places.
> > There should be good isolation between them. Therefore, in the connector
> > dimension, we can operate on a partition separately.
> > - For the Kafka table, the partition is a finite integer value, depending
> > on how many partitions Kafka has.
> > - For the Filesystem table, the partition is a directory structure, which
> > may be a multi-level structure. And it can be any type, because any type
> > can be converted to a character string.
> >
> > So, how do we generate a partition value? It can be directly mapped to a
> > field (identity). In addition, the partition value can also be generated
> by
> > a function (Transform), this is what I want to discuss with you.
> >
> > ## To Konstantin:
> >
> >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > defining how data should be partitioning the sink system or does it also
> > make a difference for source tables? My understanding is that it also
> makes
> > a difference for source tables (e.g. if the source system supports
> > partition pruning). I suppose, for source tables Flink does not
> > check/enforce this, but trusts that the partitioning information is
> > correct?!
> >
> > Yes, also works for source, according to my understanding, partition
> > pruning is actually a kind of filtering push-down. The source is
> > responsible for reading data that contains some specific partitions.
> >
> >> 2) I suppose it is up to the connector implementation whether/how to
> > interpret the partition information. How will this work?
> >
> > I think it depends on the interface and implementation. For example, for
> > Kafka, for example, partition fields can be defined on DDL, and some
> > filtering conditions can be done according to partition fields in query.
> > Kafkasource can operate according to these filter conditions.
> >
> >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> key.
> > FLIP-107 contains a proposal on how to define the key (which fields of
> the
> > schema should become part of the key) when writing to Kafka via Flink
> SQL.
> > How does this relate to the PARTITION BY clause?
> >
> > How to define a partition should have nothing to do with whether the
> field
> > it refers to is a key. Users can refer to the fields in the schema.
> >
> > ## To Benchao,
> >
> > I feel that your words should be talking about ShuffleBy in the
> calculation
> > level? That's a good topic, but this discussion is mainly about
> partitions
> > in connector storage.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <libenc...@apache.org> wrote:
> >
> >> Hi Jingsong,
> >>
> >> Thanks for bringing up this discussion. I like this idea generally.
> >> I'd like to add some cases we met in our scenarios.
> >>
> >> ## Source Partition By
> >> There is an use case that users want to do some lookup thing in the UDF,
> >> it's very like the dimension table. It's common for them to cache some
> >> lookup result
> >> in their UDF. If there is no 'partition by' from the source, they maybe
> >> need to cache
> >> more data in each subtask.
> >> Actually, what they need is `keyBy` from the DataStream world.
> >> We supported this feature already in our production.
> >>
> >> ## Sink Partition By
> >> Recently we also received some requirements about this feature.
> >> Users want to add their custom sink, and need the data shuffled before
> the
> >> sink.
> >> They will do some thing for the data of the same partition key.
> >>
> >> An addition to 'Source Partition By' semantic, actually, it's not enough
> >> for current use cases.
> >> The more common way to do this is to add partition by semantic in
> 'view',
> >> then
> >> users can do the 'keyBy' multiple times in one query.
> >>
> >> I've no strong options about these features, just add some use cases and
> >> would like to hear more options about this.
> >>
> >>
> >> Konstantin Knauf <kna...@apache.org> 于2020年8月31日周一 下午7:09写道:
> >>
> >>> Hi Jingsong,
> >>>
> >>> I would like to understand this FLIP (?) a bit better, but I am missing
> >>> some background, I believe. So, some basic questions:
> >>>
> >>> 1) Does the PARTITION BY clause only have an effect for sink tables
> >>> defining how data should be partitioning the sink system or does it
> also
> >>> make a difference for source tables? My understanding is that it also
> >> makes
> >>> a difference for source tables (e.g. if the source system
> >>> supports partition pruning). I suppose, for source tables Flink does
> not
> >>> check/enforce this, but trusts that the partitioning information is
> >>> correct?!
> >>>
> >>> 2) I suppose it is up to the connector implementation whether/how to
> >>> interpret the partition information. How will this work?
> >>>
> >>> 3) For Kafka, I suppose, the most common partitioning strategy is by
> key.
> >>> FLIP-107 contains a proposal on how to define the key (which fields of
> >> the
> >>> schema should become part of the key) when writing to Kafka via Flink
> >> SQL.
> >>> How does this relate to the PARTITION BY clause?
> >>>
> >>> Thanks,
> >>>
> >>> Konstantin
> >>>
> >>>
> >>>
> >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> ## Motivation
> >>>>
> >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
> >>>> extent that let us support Hive's partitioning.
> >>>> But this partition definition is completely specific to Hive/File
> >>>> systems, with the continuous development of the system, there are new
> >>>> requirements:
> >>>>
> >>>> - FLIP-107 [2] requirements: A common requirement is to create a
> custom
> >>>> partitioning of the data. We should have a way to specify/compute
> >> target
> >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be
> >>> the
> >>>> only way to control partitioning.
> >>>>
> >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces
> >>> partition
> >>>> values by taking a column value and optionally transforming it.
> Iceberg
> >>> is
> >>>> responsible for converting event_time into event_date, and keeps track
> >> of
> >>>> the relationship.
> >>>>
> >>>> So I think it is better to introduce partitioning strategies to Flink,
> >>>> the partitioning strategies is similar to partitioning in traditional
> >>>> database like Oracle [4].
> >>>>
> >>>> ## Proposed Partitioning DDL
> >>>>
> >>>> Hash Partitioning Tables:
> >>>>
> >>>> CREATE TABLE kafka_table (
> >>>>    id STRING,
> >>>>    name STRING,
> >>>>    date: DATE ... )
> >>>> PARTITIONED BY (HASH(id, name))
> >>>>
> >>>> Explicit Partitioning Tables (Introduced in FLIP-63):
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: DATE ... )
> >>>> PARTITIONED BY (date)
> >>>>
> >>>> (Can we remove the brackets when there is only a single layer
> >> partition?
> >>> =>
> >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
> >>>>
> >>>> Composite Partitioning Tables:
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: Date
> >>>>     ... )
> >>>> PARTITIONED BY (year(date), month(date), day(date))
> >>>>
> >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: Date,
> >>>>    y: STRING,'
> >>>>    m: STRING,
> >>>>    d: STRING,
> >>>>     ... )
> >>>> PARTITIONED BY (y, m, d)
> >>>>
> >>>> ## Rejected Alternatives
> >>>>
> >>>> Composite Partitioning Tables DDL like Oracle:
> >>>>
> >>>> CREATE TABLE invoices (
> >>>>    invoice_no    NUMBER NOT NULL,
> >>>>    invoice_date  DATE   NOT NULL,
> >>>>    comments      VARCHAR2(500))
> >>>> PARTITION BY RANGE (invoice_date)
> >>>> SUBPARTITION BY HASH (invoice_no)
> >>>> SUBPARTITIONS 8 (
> >>>>    PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> >>>> 'DD/MM/YYYY'));
> >>>>
> >>>> - First, Multi level partitioning is a common thing in big data
> >> systems.
> >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex, but
> >>> also
> >>>> completely different from big data systems such as hive. Big data
> >> systems
> >>>> need to specify less partition information than traditional ones, so
> it
> >>> is
> >>>> more natural to write all partitions in one bracket.
> >>>>
> >>>> ## Other Interface changes
> >>>>
> >>>> It can be imagined that this change will involve many Catalog / Table
> >>>> related interfaces, and it is necessary to replace the previous
> >>>> `List<String> partitionKeys` with `partitioning strategies`.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >>>> [3]http://iceberg.apache.org/partitioning/
> >>>> [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
> >>>>
> >>>> Best,
> >>>> Jingsong
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Konstantin Knauf
> >>>
> >>> https://twitter.com/snntrable
> >>>
> >>> https://github.com/knaufk
> >>>
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to