Hi Jark,

Ad. 2 I added a section to discuss relation to FLIP-63

Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
Therefore you have the key.format.type.

I also considered exactly what you are suggesting (prefixing with
connector or kafka). I should've put that into an Option/Rejected
alternatives.

I agree timestamp, key.*, value.* are connector properties. Why I wanted
to suggest not adding that prefix in the first version is that actually
all the properties in the WITH section are connector properties. Even
format is in the end a connector property as some of the sources might
not have a format, imo. The benefit of not adding the prefix is that it
makes the keys a bit shorter. Imagine prefixing all the properties with
connector (or if we go with FLINK-12557: elasticsearch):

elasticsearch.key.format.type: csv

elasticsearch.key.format.field: ....

elasticsearch.key.format.delimiter: ....

elasticsearch.key.format.*: ....

I am fine with doing it though if this is a preferred approach in the
community.

Ad in-line comments:

I forgot to update the `value.fields.include` property. It should be
*value.fields-include. *Which I think you also suggested in the comment,
right?

As for the cast vs declaring output type of computed column. I think
it's better not to use CAST, but declare a type of an expression and
later on infer the output type of SYSTEM_METADATA. The reason is I think
this way it will be easier to implement e.g. filter push downs when
working with the native types of the source, e.g. in case of Kafka's
offset, i think it's better to pushdown long rather than string. This
could let us push expression like e.g. offset > 12345 & offset < 59382.
Otherwise we would have to push down cast(offset, long) > 12345 &&
cast(offset, long) < 59382.  Moreover I think we need to introduce the
type for computed columns anyway to support functions that infer output
type based on expected return type.

As for the computed column push down. Yes, SYSTEM_METADATA would have to
be pushed down to the source. If it is not possible the planner should
fail. As far as I know computed columns push down will be part of source
rework, won't it? ;)

As for the persisted computed column. I think it is completely
orthogonal. In my current proposal you can also partition by a computed
column. The difference between using a udf in partitioned by vs
partitioned by a computed column is that when you partition by a
computed column this column must be also computed when reading the
table. If you use a udf in the partitioned by, the expression is
computed only when inserting into the table.

Hope this answers some of your questions. Looking forward for further
suggestions.

Best,

Dawid

**


On 02/03/2020 05:18, Jark Wu wrote:
> Hi,
>
> Thanks Dawid for starting such a great discussion. Reaing metadata and
> key-part information from source is an important feature for streaming
> users.
>
> In general, I agree with the proposal of the FLIP.
> I will leave my thoughts and comments here:
>
> 1) +1 to use connector properties instead of introducing HEADER keyword as
> the reason you mentioned in the FLIP.
> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a
> section to explain what's the relationship between them.
>     Do their concepts conflict? Could INSERT PARTITION be used on the
> PARTITIONED table in this FLIP?
> 3) Currently, properties are hierarchical in Flink SQL. Shall we make the
> new introduced properties more hierarchical?
>     For example, "timestamp" => "connector.timestamp"? (actually, I prefer
> "kafka.timestamp" which is another improvement for properties FLINK-12557)
>     A single "timestamp" in properties may mislead users that the field is
> a rowtime attribute.
>
> I also left some minor comments in the FLIP.
>
> Thanks,
> Jark
>
>
>
> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi,
>>
>> I would like to propose an improvement that would enable reading table
>> columns from different parts of source records. Besides the main payload
>> majority (if not all of the sources) expose additional information. It
>> can be simply a read-only metadata such as offset, ingestion time or a
>> read and write  parts of the record that contain data but additionally
>> serve different purposes (partitioning, compaction etc.), e.g. key or
>> timestamp in Kafka.
>>
>> We should make it possible to read and write data from all of those
>> locations. In this proposal I discuss reading partitioning data, for
>> completeness this proposal discusses also the partitioning when writing
>> data out.
>>
>> I am looking forward to your comments.
>>
>> You can access the FLIP here:
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>>
>> Best,
>>
>> Dawid
>>
>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to