Hi Jingsong,

I haven't looked at your proposal but I think it make sense to have a separate FLIP for the parititioning topic. I'm currently working on an update to FLIP-107 and would suggest to remove the paritioning topic there. FLIP-107 will only focus on accessing metadata and expressing key/value formats.

What do you think?

Regards,
Timo

On 01.09.20 07:39, Jingsong Li wrote:
Thanks Konstantin and Benchao for your response.

If we need to push forward the implementation, it should be a FLIP.

My original intention was to unify the partition definitions for batches
and streams:

- What is "PARTITION" on a table? Partitions define the physical storage
form of a table. Different partitions should be stored in different places.
There should be good isolation between them. Therefore, in the connector
dimension, we can operate on a partition separately.
- For the Kafka table, the partition is a finite integer value, depending
on how many partitions Kafka has.
- For the Filesystem table, the partition is a directory structure, which
may be a multi-level structure. And it can be any type, because any type
can be converted to a character string.

So, how do we generate a partition value? It can be directly mapped to a
field (identity). In addition, the partition value can also be generated by
a function (Transform), this is what I want to discuss with you.

## To Konstantin:

1) Does the PARTITION BY clause only have an effect for sink tables
defining how data should be partitioning the sink system or does it also
make a difference for source tables? My understanding is that it also makes
a difference for source tables (e.g. if the source system supports
partition pruning). I suppose, for source tables Flink does not
check/enforce this, but trusts that the partitioning information is
correct?!

Yes, also works for source, according to my understanding, partition
pruning is actually a kind of filtering push-down. The source is
responsible for reading data that contains some specific partitions.

2) I suppose it is up to the connector implementation whether/how to
interpret the partition information. How will this work?

I think it depends on the interface and implementation. For example, for
Kafka, for example, partition fields can be defined on DDL, and some
filtering conditions can be done according to partition fields in query.
Kafkasource can operate according to these filter conditions.

3) For Kafka, I suppose, the most common partitioning strategy is by key.
FLIP-107 contains a proposal on how to define the key (which fields of the
schema should become part of the key) when writing to Kafka via Flink SQL.
How does this relate to the PARTITION BY clause?

How to define a partition should have nothing to do with whether the field
it refers to is a key. Users can refer to the fields in the schema.

## To Benchao,

I feel that your words should be talking about ShuffleBy in the calculation
level? That's a good topic, but this discussion is mainly about partitions
in connector storage.

Best,
Jingsong

On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <libenc...@apache.org> wrote:

Hi Jingsong,

Thanks for bringing up this discussion. I like this idea generally.
I'd like to add some cases we met in our scenarios.

## Source Partition By
There is an use case that users want to do some lookup thing in the UDF,
it's very like the dimension table. It's common for them to cache some
lookup result
in their UDF. If there is no 'partition by' from the source, they maybe
need to cache
more data in each subtask.
Actually, what they need is `keyBy` from the DataStream world.
We supported this feature already in our production.

## Sink Partition By
Recently we also received some requirements about this feature.
Users want to add their custom sink, and need the data shuffled before the
sink.
They will do some thing for the data of the same partition key.

An addition to 'Source Partition By' semantic, actually, it's not enough
for current use cases.
The more common way to do this is to add partition by semantic in 'view',
then
users can do the 'keyBy' multiple times in one query.

I've no strong options about these features, just add some use cases and
would like to hear more options about this.


Konstantin Knauf <kna...@apache.org> 于2020年8月31日周一 下午7:09写道:

Hi Jingsong,

I would like to understand this FLIP (?) a bit better, but I am missing
some background, I believe. So, some basic questions:

1) Does the PARTITION BY clause only have an effect for sink tables
defining how data should be partitioning the sink system or does it also
make a difference for source tables? My understanding is that it also
makes
a difference for source tables (e.g. if the source system
supports partition pruning). I suppose, for source tables Flink does not
check/enforce this, but trusts that the partitioning information is
correct?!

2) I suppose it is up to the connector implementation whether/how to
interpret the partition information. How will this work?

3) For Kafka, I suppose, the most common partitioning strategy is by key.
FLIP-107 contains a proposal on how to define the key (which fields of
the
schema should become part of the key) when writing to Kafka via Flink
SQL.
How does this relate to the PARTITION BY clause?

Thanks,

Konstantin



On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <jingsongl...@gmail.com>
wrote:

Hi all,

## Motivation

FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
extent that let us support Hive's partitioning.
But this partition definition is completely specific to Hive/File
systems, with the continuous development of the system, there are new
requirements:

- FLIP-107 [2] requirements: A common requirement is to create a custom
partitioning of the data. We should have a way to specify/compute
target
partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be
the
only way to control partitioning.

- Apache Iceberg partitioning [3] requirements: Iceberg produces
partition
values by taking a column value and optionally transforming it. Iceberg
is
responsible for converting event_time into event_date, and keeps track
of
the relationship.

So I think it is better to introduce partitioning strategies to Flink,
the partitioning strategies is similar to partitioning in traditional
database like Oracle [4].

## Proposed Partitioning DDL

Hash Partitioning Tables:

CREATE TABLE kafka_table (
   id STRING,
   name STRING,
   date: DATE ... )
PARTITIONED BY (HASH(id, name))

Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
   name STRING,
   date: DATE ... )
PARTITIONED BY (date)

(Can we remove the brackets when there is only a single layer
partition?
=>
"PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )

Composite Partitioning Tables:

CREATE TABLE fs_table (
   name STRING,
   date: Date
    ... )
PARTITIONED BY (year(date), month(date), day(date))

Composite Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
   name STRING,
   date: Date,
   y: STRING,'
   m: STRING,
   d: STRING,
    ... )
PARTITIONED BY (y, m, d)

## Rejected Alternatives

Composite Partitioning Tables DDL like Oracle:

CREATE TABLE invoices (
   invoice_no    NUMBER NOT NULL,
   invoice_date  DATE   NOT NULL,
   comments      VARCHAR2(500))
PARTITION BY RANGE (invoice_date)
SUBPARTITION BY HASH (invoice_no)
SUBPARTITIONS 8 (
   PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
'DD/MM/YYYY')),
   PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
'DD/MM/YYYY')),
   PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
'DD/MM/YYYY')),
   PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
'DD/MM/YYYY'));

- First, Multi level partitioning is a common thing in big data
systems.
- Second, the syntax of "SUBPARTITIONS" is not only more complex, but
also
completely different from big data systems such as hive. Big data
systems
need to specify less partition information than traditional ones, so it
is
more natural to write all partitions in one bracket.

## Other Interface changes

It can be imagined that this change will involve many Catalog / Table
related interfaces, and it is necessary to replace the previous
`List<String> partitionKeys` with `partitioning strategies`.

What do you think?

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
[2]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
[3]http://iceberg.apache.org/partitioning/
[4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes

Best,
Jingsong



--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk



--

Best,
Benchao Li




Reply via email to