Hi Dawid,

> connector properties
Could we use "timestamp.field" instead  of "timestamp"? This will be more
consistent with "key.fields" and
it can avoid to confuse users it defines a rowtime attribute (KSQL [1] use
"timestamp" property to override ROWTIME information).

> SYSTEM_METADATA(...)
I agree SYSTEM_METADATA computed column with returning type is the most
clear way to support accessing read-only information.
We may need further discussion about the implementation details, e.g. how
to represent such computed column (esp. the returning type) in Calcite
SqlFunction?

> The difference between using a udf in partitioned by vs partitioned by a
computed column is that when you partition by a computed column this column
must be also computed when reading the table.
The computed column is not necessary to be computed when reading the table
if the column is not used in the query. This can be done by the optimizer.


[1]:
https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#ksql-timestamp-formats


On Mon, 2 Mar 2020 at 18:16, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Jark,
>
> Ad. 2 I added a section to discuss relation to FLIP-63
>
> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties.
> Therefore you have the key.format.type.
>
> I also considered exactly what you are suggesting (prefixing with
> connector or kafka). I should've put that into an Option/Rejected
> alternatives.
>
> I agree timestamp, key.*, value.* are connector properties. Why I wanted
> to suggest not adding that prefix in the first version is that actually all
> the properties in the WITH section are connector properties. Even format is
> in the end a connector property as some of the sources might not have a
> format, imo. The benefit of not adding the prefix is that it makes the keys
> a bit shorter. Imagine prefixing all the properties with connector (or if
> we go with FLINK-12557: elasticsearch):
>
> elasticsearch.key.format.type: csv
>
> elasticsearch.key.format.field: ....
>
> elasticsearch.key.format.delimiter: ....
>
> elasticsearch.key.format.*: ....
>
> I am fine with doing it though if this is a preferred approach in the
> community.
>
> Ad in-line comments:
>
> I forgot to update the `value.fields.include` property. It should be 
> *value.fields-include.
> *Which I think you also suggested in the comment, right?
>
> As for the cast vs declaring output type of computed column. I think it's
> better not to use CAST, but declare a type of an expression and later on
> infer the output type of SYSTEM_METADATA. The reason is I think this way it
> will be easier to implement e.g. filter push downs when working with the
> native types of the source, e.g. in case of Kafka's offset, i think it's
> better to pushdown long rather than string. This could let us push
> expression like e.g. offset > 12345 & offset < 59382. Otherwise we would
> have to push down cast(offset, long) > 12345 && cast(offset, long) <
> 59382.  Moreover I think we need to introduce the type for computed columns
> anyway to support functions that infer output type based on expected return
> type.
>
> As for the computed column push down. Yes, SYSTEM_METADATA would have to
> be pushed down to the source. If it is not possible the planner should
> fail. As far as I know computed columns push down will be part of source
> rework, won't it? ;)
>
> As for the persisted computed column. I think it is completely orthogonal.
> In my current proposal you can also partition by a computed column. The
> difference between using a udf in partitioned by vs partitioned by a
> computed column is that when you partition by a computed column this column
> must be also computed when reading the table. If you use a udf in the
> partitioned by, the expression is computed only when inserting into the
> table.
>
> Hope this answers some of your questions. Looking forward for further
> suggestions.
>
> Best,
>
> Dawid
>
>
>
> On 02/03/2020 05:18, Jark Wu wrote:
>
> Hi,
>
> Thanks Dawid for starting such a great discussion. Reaing metadata and
> key-part information from source is an important feature for streaming
> users.
>
> In general, I agree with the proposal of the FLIP.
> I will leave my thoughts and comments here:
>
> 1) +1 to use connector properties instead of introducing HEADER keyword as
> the reason you mentioned in the FLIP.
> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a
> section to explain what's the relationship between them.
>     Do their concepts conflict? Could INSERT PARTITION be used on the
> PARTITIONED table in this FLIP?
> 3) Currently, properties are hierarchical in Flink SQL. Shall we make the
> new introduced properties more hierarchical?
>     For example, "timestamp" => "connector.timestamp"? (actually, I prefer
> "kafka.timestamp" which is another improvement for properties FLINK-12557)
>     A single "timestamp" in properties may mislead users that the field is
> a rowtime attribute.
>
> I also left some minor comments in the FLIP.
>
> Thanks,
> Jark
>
>
>
> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <dwysakow...@apache.org> 
> <dwysakow...@apache.org>
> wrote:
>
>
> Hi,
>
> I would like to propose an improvement that would enable reading table
> columns from different parts of source records. Besides the main payload
> majority (if not all of the sources) expose additional information. It
> can be simply a read-only metadata such as offset, ingestion time or a
> read and write  parts of the record that contain data but additionally
> serve different purposes (partitioning, compaction etc.), e.g. key or
> timestamp in Kafka.
>
> We should make it possible to read and write data from all of those
> locations. In this proposal I discuss reading partitioning data, for
> completeness this proposal discusses also the partitioning when writing
> data out.
>
> I am looking forward to your comments.
>
> You can access the FLIP here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>
> Best,
>
> Dawid
>
>
>
>
>

Reply via email to