Hi,

1. I thought a bit more on how the source would emit the columns and I
now see its not exactly the same as regular columns. I see a need to
elaborate a bit more on that in the FLIP as you asked, Jark.

I do agree mostly with Danny on how we should do that. One additional
things I would introduce is an

interface SupportsMetadata {

   boolean supportsMetadata(Set<String> metadataFields);

   TableSource generateMetadataFields(Set<String> metadataFields);

}

This way the source would have to declare/emit only the requested
metadata fields. In order not to clash with user defined fields. When
emitting the metadata field I would prepend the column name with
__system_{property_name}. Therefore when requested
SYSTEM_METADATA("partition") the source would append a field
__system_partition to the schema. This would be never visible to the
user as it would be used only for the subsequent computed columns. If
that makes sense to you, I will update the FLIP with this description.

2. CAST vs explicit type in computed columns

Here I agree with Danny. It is also the current state of the proposal.

3. Partitioning on computed column vs function

Here I also agree with Danny. I also think those are orthogonal. I would
leave out the STORED computed columns out of the discussion. I don't see
how do they relate to the partitioning. I already put both of those
cases in the document. We can either partition on a computed column or
use a udf in a partioned by clause. I am fine with leaving out the
partitioning by udf in the first version if you still have some concerns.

As for your question Danny. It depends which partitioning strategy you use.

For the HASH partitioning strategy I thought it would work as you
explained. It would be N = MOD(expr, num). I am not sure though if we
should introduce the PARTITIONS clause. Usually Flink does not own the
data and the partitions are already an intrinsic property of the
underlying source e.g. for kafka we do not create topics, but we just
describe pre-existing pre-partitioned topic.

4. timestamp vs timestamp.field vs connector.field vs ...

I am fine with changing it to timestamp.field to be consistent with
other value.fields and key.fields. Actually that was also my initial
proposal in a first draft I prepared. I changed it afterwards to shorten
the key.

Best,

Dawid

On 03/03/2020 09:00, Danny Chan wrote:
> Thanks Dawid for bringing up this discussion, I think it is a useful feature ~
>
> About how the metadata outputs from source
>
> I think it is completely orthogonal, computed column push down is another 
> topic, this should not be a blocker but a promotion, if we do not have any 
> filters on the computed column, there is no need to do any pushings; the 
> source node just emit the complete record with full metadata with the 
> declared physical schema, then when generating the virtual columns, we would 
> extract the metadata info and output as full columns(with full schema).
>
> About the type of metadata column
>
> Personally i prefer explicit type instead of CAST, they are symantic 
> equivalent though, explict type is more straight-forward and we can declare 
> the nullable attribute there.
>
> About option A: partitioning based on acomputed column VS option B: 
> partitioning with just a function
>
> From the FLIP, it seems that B's partitioning is just a strategy when writing 
> data, the partiton column is not included in the table schema, so it's just 
> useless when reading from that.
>
> - Compared to A, we do not need to generate the partition column when 
> selecting from the table(but insert into)
> - For A we can also mark the column as STORED when we want to persist that
>
> So in my opition they are orthogonal, we can support both, i saw that 
> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and the 
> partitions are managed under a "tablenamespace", the partition in which the 
> record is stored is partition number N, where N = MOD(expr, num), for your 
> design, which partiton the record would persist ?
>
> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> [2] 
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>
> Best,
> Danny Chan
> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <dwysakow...@apache.org>,写道:
>> Hi Jark,
>> Ad. 2 I added a section to discuss relation to FLIP-63
>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore 
>> you have the key.format.type.
>> I also considered exactly what you are suggesting (prefixing with connector 
>> or kafka). I should've put that into an Option/Rejected alternatives.
>> I agree timestamp, key.*, value.* are connector properties. Why I wanted to 
>> suggest not adding that prefix in the first version is that actually all the 
>> properties in the WITH section are connector properties. Even format is in 
>> the end a connector property as some of the sources might not have a format, 
>> imo. The benefit of not adding the prefix is that it makes the keys a bit 
>> shorter. Imagine prefixing all the properties with connector (or if we go 
>> with FLINK-12557: elasticsearch):
>> elasticsearch.key.format.type: csv
>> elasticsearch.key.format.field: ....
>> elasticsearch.key.format.delimiter: ....
>> elasticsearch.key.format.*: ....
>> I am fine with doing it though if this is a preferred approach in the 
>> community.
>> Ad in-line comments:
>> I forgot to update the `value.fields.include` property. It should be 
>> value.fields-include. Which I think you also suggested in the comment, right?
>> As for the cast vs declaring output type of computed column. I think it's 
>> better not to use CAST, but declare a type of an expression and later on 
>> infer the output type of SYSTEM_METADATA. The reason is I think this way it 
>> will be easier to implement e.g. filter push downs when working with the 
>> native types of the source, e.g. in case of Kafka's offset, i think it's 
>> better to pushdown long rather than string. This could let us push 
>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we would 
>> have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. 
>> Moreover I think we need to introduce the type for computed columns anyway 
>> to support functions that infer output type based on expected return type.
>> As for the computed column push down. Yes, SYSTEM_METADATA would have to be 
>> pushed down to the source. If it is not possible the planner should fail. As 
>> far as I know computed columns push down will be part of source rework, 
>> won't it? ;)
>> As for the persisted computed column. I think it is completely orthogonal. 
>> In my current proposal you can also partition by a computed column. The 
>> difference between using a udf in partitioned by vs partitioned by a 
>> computed column is that when you partition by a computed column this column 
>> must be also computed when reading the table. If you use a udf in the 
>> partitioned by, the expression is computed only when inserting into the 
>> table.
>> Hope this answers some of your questions. Looking forward for further 
>> suggestions.
>> Best,
>> Dawid
>>
>>
>> On 02/03/2020 05:18, Jark Wu wrote:
>>> Hi,
>>>
>>> Thanks Dawid for starting such a great discussion. Reaing metadata and
>>> key-part information from source is an important feature for streaming
>>> users.
>>>
>>> In general, I agree with the proposal of the FLIP.
>>> I will leave my thoughts and comments here:
>>>
>>> 1) +1 to use connector properties instead of introducing HEADER keyword as
>>> the reason you mentioned in the FLIP.
>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a
>>> section to explain what's the relationship between them.
>>>    Do their concepts conflict? Could INSERT PARTITION be used on the
>>> PARTITIONED table in this FLIP?
>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we make the
>>> new introduced properties more hierarchical?
>>>    For example, "timestamp" => "connector.timestamp"? (actually, I prefer
>>> "kafka.timestamp" which is another improvement for properties FLINK-12557)
>>>    A single "timestamp" in properties may mislead users that the field is
>>> a rowtime attribute.
>>>
>>> I also left some minor comments in the FLIP.
>>>
>>> Thanks,
>>> Jark
>>>
>>>
>>>
>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <dwysakow...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to propose an improvement that would enable reading table
>>>> columns from different parts of source records. Besides the main payload
>>>> majority (if not all of the sources) expose additional information. It
>>>> can be simply a read-only metadata such as offset, ingestion time or a
>>>> read and write  parts of the record that contain data but additionally
>>>> serve different purposes (partitioning, compaction etc.), e.g. key or
>>>> timestamp in Kafka.
>>>>
>>>> We should make it possible to read and write data from all of those
>>>> locations. In this proposal I discuss reading partitioning data, for
>>>> completeness this proposal discusses also the partitioning when writing
>>>> data out.
>>>>
>>>> I am looking forward to your comments.
>>>>
>>>> You can access the FLIP here:
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>>
>>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to