Hi Jingsong,

Thanks for the clarification, and sorry to misunderstand your first
intention.
What I was talking about is indeed another topic, we can leave it to the
future,
and see if there are any other people who have the same scenarios.

Jingsong Li <jingsongl...@gmail.com> 于2020年9月3日周四 上午10:56写道:

> Thanks Timo for working on FLIP-107.
>
> Agree, I think it is good.
> I'll spend more time to form a FLIP in detail later.
>
> Best,
> Jingsong
>
> On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <twal...@apache.org> wrote:
>
> > Hi Jingsong,
> >
> > I haven't looked at your proposal but I think it make sense to have a
> > separate FLIP for the parititioning topic. I'm currently working on an
> > update to FLIP-107 and would suggest to remove the paritioning topic
> > there. FLIP-107 will only focus on accessing metadata and expressing
> > key/value formats.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 07:39, Jingsong Li wrote:
> > > Thanks Konstantin and Benchao for your response.
> > >
> > > If we need to push forward the implementation, it should be a FLIP.
> > >
> > > My original intention was to unify the partition definitions for
> batches
> > > and streams:
> > >
> > > - What is "PARTITION" on a table? Partitions define the physical
> storage
> > > form of a table. Different partitions should be stored in different
> > places.
> > > There should be good isolation between them. Therefore, in the
> connector
> > > dimension, we can operate on a partition separately.
> > > - For the Kafka table, the partition is a finite integer value,
> depending
> > > on how many partitions Kafka has.
> > > - For the Filesystem table, the partition is a directory structure,
> which
> > > may be a multi-level structure. And it can be any type, because any
> type
> > > can be converted to a character string.
> > >
> > > So, how do we generate a partition value? It can be directly mapped to
> a
> > > field (identity). In addition, the partition value can also be
> generated
> > by
> > > a function (Transform), this is what I want to discuss with you.
> > >
> > > ## To Konstantin:
> > >
> > >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > > defining how data should be partitioning the sink system or does it
> also
> > > make a difference for source tables? My understanding is that it also
> > makes
> > > a difference for source tables (e.g. if the source system supports
> > > partition pruning). I suppose, for source tables Flink does not
> > > check/enforce this, but trusts that the partitioning information is
> > > correct?!
> > >
> > > Yes, also works for source, according to my understanding, partition
> > > pruning is actually a kind of filtering push-down. The source is
> > > responsible for reading data that contains some specific partitions.
> > >
> > >> 2) I suppose it is up to the connector implementation whether/how to
> > > interpret the partition information. How will this work?
> > >
> > > I think it depends on the interface and implementation. For example,
> for
> > > Kafka, for example, partition fields can be defined on DDL, and some
> > > filtering conditions can be done according to partition fields in
> query.
> > > Kafkasource can operate according to these filter conditions.
> > >
> > >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> > key.
> > > FLIP-107 contains a proposal on how to define the key (which fields of
> > the
> > > schema should become part of the key) when writing to Kafka via Flink
> > SQL.
> > > How does this relate to the PARTITION BY clause?
> > >
> > > How to define a partition should have nothing to do with whether the
> > field
> > > it refers to is a key. Users can refer to the fields in the schema.
> > >
> > > ## To Benchao,
> > >
> > > I feel that your words should be talking about ShuffleBy in the
> > calculation
> > > level? That's a good topic, but this discussion is mainly about
> > partitions
> > > in connector storage.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <libenc...@apache.org>
> wrote:
> > >
> > >> Hi Jingsong,
> > >>
> > >> Thanks for bringing up this discussion. I like this idea generally.
> > >> I'd like to add some cases we met in our scenarios.
> > >>
> > >> ## Source Partition By
> > >> There is an use case that users want to do some lookup thing in the
> UDF,
> > >> it's very like the dimension table. It's common for them to cache some
> > >> lookup result
> > >> in their UDF. If there is no 'partition by' from the source, they
> maybe
> > >> need to cache
> > >> more data in each subtask.
> > >> Actually, what they need is `keyBy` from the DataStream world.
> > >> We supported this feature already in our production.
> > >>
> > >> ## Sink Partition By
> > >> Recently we also received some requirements about this feature.
> > >> Users want to add their custom sink, and need the data shuffled before
> > the
> > >> sink.
> > >> They will do some thing for the data of the same partition key.
> > >>
> > >> An addition to 'Source Partition By' semantic, actually, it's not
> enough
> > >> for current use cases.
> > >> The more common way to do this is to add partition by semantic in
> > 'view',
> > >> then
> > >> users can do the 'keyBy' multiple times in one query.
> > >>
> > >> I've no strong options about these features, just add some use cases
> and
> > >> would like to hear more options about this.
> > >>
> > >>
> > >> Konstantin Knauf <kna...@apache.org> 于2020年8月31日周一 下午7:09写道:
> > >>
> > >>> Hi Jingsong,
> > >>>
> > >>> I would like to understand this FLIP (?) a bit better, but I am
> missing
> > >>> some background, I believe. So, some basic questions:
> > >>>
> > >>> 1) Does the PARTITION BY clause only have an effect for sink tables
> > >>> defining how data should be partitioning the sink system or does it
> > also
> > >>> make a difference for source tables? My understanding is that it also
> > >> makes
> > >>> a difference for source tables (e.g. if the source system
> > >>> supports partition pruning). I suppose, for source tables Flink does
> > not
> > >>> check/enforce this, but trusts that the partitioning information is
> > >>> correct?!
> > >>>
> > >>> 2) I suppose it is up to the connector implementation whether/how to
> > >>> interpret the partition information. How will this work?
> > >>>
> > >>> 3) For Kafka, I suppose, the most common partitioning strategy is by
> > key.
> > >>> FLIP-107 contains a proposal on how to define the key (which fields
> of
> > >> the
> > >>> schema should become part of the key) when writing to Kafka via Flink
> > >> SQL.
> > >>> How does this relate to the PARTITION BY clause?
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Konstantin
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> ## Motivation
> > >>>>
> > >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to
> an
> > >>>> extent that let us support Hive's partitioning.
> > >>>> But this partition definition is completely specific to Hive/File
> > >>>> systems, with the continuous development of the system, there are
> new
> > >>>> requirements:
> > >>>>
> > >>>> - FLIP-107 [2] requirements: A common requirement is to create a
> > custom
> > >>>> partitioning of the data. We should have a way to specify/compute
> > >> target
> > >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would
> be
> > >>> the
> > >>>> only way to control partitioning.
> > >>>>
> > >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces
> > >>> partition
> > >>>> values by taking a column value and optionally transforming it.
> > Iceberg
> > >>> is
> > >>>> responsible for converting event_time into event_date, and keeps
> track
> > >> of
> > >>>> the relationship.
> > >>>>
> > >>>> So I think it is better to introduce partitioning strategies to
> Flink,
> > >>>> the partitioning strategies is similar to partitioning in
> traditional
> > >>>> database like Oracle [4].
> > >>>>
> > >>>> ## Proposed Partitioning DDL
> > >>>>
> > >>>> Hash Partitioning Tables:
> > >>>>
> > >>>> CREATE TABLE kafka_table (
> > >>>>    id STRING,
> > >>>>    name STRING,
> > >>>>    date: DATE ... )
> > >>>> PARTITIONED BY (HASH(id, name))
> > >>>>
> > >>>> Explicit Partitioning Tables (Introduced in FLIP-63):
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: DATE ... )
> > >>>> PARTITIONED BY (date)
> > >>>>
> > >>>> (Can we remove the brackets when there is only a single layer
> > >> partition?
> > >>> =>
> > >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
> > >>>>
> > >>>> Composite Partitioning Tables:
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: Date
> > >>>>     ... )
> > >>>> PARTITIONED BY (year(date), month(date), day(date))
> > >>>>
> > >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: Date,
> > >>>>    y: STRING,'
> > >>>>    m: STRING,
> > >>>>    d: STRING,
> > >>>>     ... )
> > >>>> PARTITIONED BY (y, m, d)
> > >>>>
> > >>>> ## Rejected Alternatives
> > >>>>
> > >>>> Composite Partitioning Tables DDL like Oracle:
> > >>>>
> > >>>> CREATE TABLE invoices (
> > >>>>    invoice_no    NUMBER NOT NULL,
> > >>>>    invoice_date  DATE   NOT NULL,
> > >>>>    comments      VARCHAR2(500))
> > >>>> PARTITION BY RANGE (invoice_date)
> > >>>> SUBPARTITION BY HASH (invoice_no)
> > >>>> SUBPARTITIONS 8 (
> > >>>>    PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> > >>>> 'DD/MM/YYYY'));
> > >>>>
> > >>>> - First, Multi level partitioning is a common thing in big data
> > >> systems.
> > >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex,
> but
> > >>> also
> > >>>> completely different from big data systems such as hive. Big data
> > >> systems
> > >>>> need to specify less partition information than traditional ones, so
> > it
> > >>> is
> > >>>> more natural to write all partitions in one bracket.
> > >>>>
> > >>>> ## Other Interface changes
> > >>>>
> > >>>> It can be imagined that this change will involve many Catalog /
> Table
> > >>>> related interfaces, and it is necessary to replace the previous
> > >>>> `List<String> partitionKeys` with `partitioning strategies`.
> > >>>>
> > >>>> What do you think?
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> > >>>> [2]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> > >>>> [3]http://iceberg.apache.org/partitioning/
> > >>>> [4]
> https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
> > >>>>
> > >>>> Best,
> > >>>> Jingsong
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Konstantin Knauf
> > >>>
> > >>> https://twitter.com/snntrable
> > >>>
> > >>> https://github.com/knaufk
> > >>>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> > >
> >
> >
>
> --
> Best, Jingsong Lee
>


-- 

Best,
Benchao Li

Reply via email to