[DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-01 Thread Dawid Wysakowicz
Hi, I would like to propose an improvement that would enable reading table columns from different parts of source records. Besides the main payload majority (if not all of the sources) expose additional information. It can be simply a read-only metadata such as offset, ingestion time or a read and

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-01 Thread Jark Wu
Hi, Thanks Dawid for starting such a great discussion. Reaing metadata and key-part information from source is an important feature for streaming users. In general, I agree with the proposal of the FLIP. I will leave my thoughts and comments here: 1) +1 to use connector properties instead of int

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-02 Thread Dawid Wysakowicz
Hi Jark, Ad. 2 I added a section to discuss relation to FLIP-63 Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore you have the key.format.type. I also considered exactly what you are suggesting (prefixing with connector or kafka). I should've put that into an Option/Rej

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-02 Thread Jark Wu
Hi Dawid, > connector properties Could we use "timestamp.field" instead of "timestamp"? This will be more consistent with "key.fields" and it can avoid to confuse users it defines a rowtime attribute (KSQL [1] use "timestamp" property to override ROWTIME information). > SYSTEM_METADATA(...) I ag

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-02 Thread Leonard Xu
Hi Dawid, Thanks for driving this FLIP,big +1 for the proposal feature. About the connector.properties part, I suggest avoid using timestamp because timestamp is a keyword in DDL as dataType, user may feel confused, using 'timestamp.filed’ or ’source.timestamp’ will be better? ``` CREATE TABLE

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-03 Thread Danny Chan
Thanks Dawid for bringing up this discussion, I think it is a useful feature ~ About how the metadata outputs from source I think it is completely orthogonal, computed column push down is another topic, this should not be a blocker but a promotion, if we do not have any filters on the computed

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-03 Thread Dawid Wysakowicz
Hi, 1. I thought a bit more on how the source would emit the columns and I now see its not exactly the same as regular columns. I see a need to elaborate a bit more on that in the FLIP as you asked, Jark. I do agree mostly with Danny on how we should do that. One additional things I would introdu

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-03 Thread Jark Wu
Thanks Dawid, I have two more questions. > SupportsMetadata Introducing SupportsMetadata sounds good to me. But I have some questions regarding to this interface. 1) How do the source know what the expected return type of each metadata? 2) Where to put the metadata fields? Append to the existing

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-04 Thread Kurt Young
Hi Dawid, I have a couple of questions around key fields, actually I also have some other questions but want to be focused on key fields first. 1. I don't fully understand the usage of "key.fields". Is this option only valid during write operation? Because for reading, I can't imagine how such op

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-03-04 Thread Kurt Young
Sorry, forgot one question. 4. Can we make the value.fields-include more orthogonal? Like one can specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to just ignore timestamp but keep key. Best, Kurt On Wed, Mar 4, 2020 at 4:42 P

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-08-11 Thread Dongwon Kim
Big +1 for this FLIP. Recently I'm working on some Kafka topics that have timestamps as metadata, not in the message body. I want to declare a table from the topics with DDL but "rowtime_column_name" in seems to accept only existing columns. > : > WATERMARK FOR rowtime_column_name AS watermar

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-08-11 Thread Kurt Young
The content length of FLIP-107 is relatively short but the scope and implications it will cause is actually very big. >From what I can tell now, I think there is a good chance that we can deliver part of this FLIP in 1.12, e.g. accessing the metadata field just like you mentioned. Best, Kurt On

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-08-11 Thread Leonard Xu
+1 for FLIP-107 Reading different parts of source code should be the key feature for Flink SQL, like metadata in CDC data, key and timestamp in Kafka records. The scope of FLIP-107 is too big to finish in one version IMO, maybe we can start part work in 1.12. Best Leonard > 在 2020年8月11日,19:5

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-04 Thread Timo Walther
Hi everyone, I completely reworked FLIP-107. It now covers the full story how to read and write metadata from/to connectors and formats. It considers all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It introduces the concept of PERSISTED computed columns and leaves out partition

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-04 Thread Dawid Wysakowicz
Hi Timo, Thank you very much for the update. It indeed covers the full story in more details. I agree with the proposal. On 04/09/2020 10:48, Timo Walther wrote: > Hi everyone, > > I completely reworked FLIP-107. It now covers the full story how to > read and write metadata from/to connectors and

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-04 Thread Aljoscha Krettek
I like the proposal! I didn't check the implementation section in detail but the SQL DDL examples look good as well as the options for specifying how fields are mapped to keys/values look good. Aljoscha On 04.09.20 11:47, Dawid Wysakowicz wrote: Hi Timo, Thank you very much for the update. I

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Konstantin Knauf
Hi Timo, Thanks a lot for picking up this FLIP. I believe it's a very important one for almost everyone who uses Flink SQL with Kafka. Also +1 to leave out partitioning for now. Best, Konstantin On Fri, Sep 4, 2020 at 1:37 PM Aljoscha Krettek wrote: > I like the proposal! I didn't check the i

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Leonard Xu
Thanks Timo for the update ! I like the proposal in general, I have some question about the doc. (1) About the DDL `CAST(SYSTEM_METADATA("offset") AS INT)`, It looks like we use `CAST` to resolve the nullability of meta column type, Could we use explicit type (i.e. SYSTEM_METADATA("offset”) INT

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Leonard Xu
Ignore my question(4), I’ve found the answer in the doc : 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus fields of the key) > 在 2020年9月7日,16:33,Leonard Xu 写道: > > (4) About Reading and writing from key and value section, we bind that the > fields of key part must belo

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Timo Walther
Hi Leonard, thanks for your feedback. (1) Actually, I discuss this already in the FLIP. But let me summarize our options again if it was not clear enough in the FLIP: a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT)) pro: readable, complex arithmetic possible, more SQL compliant

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Jark Wu
Thanks Timo, I think this FLIP is already in great shape! I have following questions: 1. `Map listReadableMetadata()` only allows one possible DataType for a metadata key. However, users may expect to use different types, e.g. for "timestamp" metadata, users may use it as BIGINT, or TIMESTAMP(6)

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Jark Wu
Sorry, I forgot to ask one more question. 4. Do we allow to use the SYSTEM_METADATA as a sub-expression? For example, checksum AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP)['checksum'] AS STRING), myvalue AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP)['mykey'] AS BIGINT) And we will push down

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Timo Walther
Hi Jark, 1. "`Map listReadableMetadata()` only allows one possible DataType for a metadata key." I was thinking about this topic a lot today. My conclusion is: yes, we should force users to specify the type as documented. Users can further cast or compute using expressions to more specific typ

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Jark Wu
Hi Timo, 1. "`Map listReadableMetadata()` only allows one possible DataType for a metadata key." I think the main purpose of the metadata feature is to access the Kafka timestamp and use it as a rowtime attribute. If we force users to use the specific type, then this feature might be tricky to us

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Danny Chan
Thanks Timo ~ The FLIP was already in pretty good shape, I have only 2 questions here: 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid read-only computed column for Kafka and can be extracted by the planner.” What is the pros we follow the SQL-SERVER syntax here ? Usually an exp

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-07 Thread Leonard Xu
Hi, Timo Thanks for you explanation and update, I have only one question for the latest FLIP. About the MAP DataType of key 'debezium-json.source', if user want to use the table name metadata, they need to write: tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS MAP)['table'

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther
Hi everyone, I updated the FLIP again and hope that I could address the mentioned concerns. @Leonard: Thanks for the explanation. I wasn't aware that ts_ms and source.ts_ms have different semantics. I updated the FLIP and expose the most commonly used properties separately. So frequently use

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Jark Wu
Hi Timo, The updated CAST SYSTEM_METADATA behavior sounds good to me. I just noticed that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME ZONE". So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL TIME ZONE" as the defined type of Kafka timestamp? I think this makes sens

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther
Hi Jark, according to Flink's and Calcite's casting definition in [1][2] TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If not, we will make it possible ;-) I'm aware of DeserializationSchema.getProducedType but I think that this method is actually misplaced. The type should

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Leonard Xu
HI, Timo Thanks for driving this FLIP. Sorry but I have a concern about Writing metadata via DynamicTableSink section: CREATE TABLE kafka_table ( id BIGINT, name STRING, timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) PERSISTED, headers AS CAST(SYSTEM_METADATA("headers") AS MAP

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Timo Walther
Hi Leonard, the only alternative I see is that we introduce a concept that is completely different to computed columns. This is also mentioned in the rejected alternative section of the FLIP. Something like: CREATE TABLE kafka_table ( id BIGINT, name STRING, timestamp INT SYSTEM_M

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-08 Thread Kurt Young
I also share the concern that reusing the computed column syntax but have different semantics would confuse users a lot. Besides, I think metadata fields are conceptually not the same with computed columns. The metadata field is a connector specific thing and it only contains the information that

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther
Hi Kurt, thanks for sharing your opinion. I'm totally up for not reusing computed columns. I think Jark was a big supporter of this syntax, @Jark are you fine with this as well? The non-computed column approach was only a "slightly rejected alternative". Furthermore, we would need to think a

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
Hi everyone, I think we have a conclusion that the writable metadata shouldn't be defined as a computed column, but a normal column. "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches. However, it is not SQL standard compliant, we need to be cautious enough when adding new s

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther
Hi Jark, now we are back at the original design proposed by Dawid :D Yes, we should be cautious about adding new syntax. But the length of this discussion shows that we are looking for a good long-term solution. In this case I would rather vote for a deep integration into the syntax. Compute

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Kurt Young
I would vote for `offset INT SYSTEM_METADATA("offset")`. I don't think we can stick with the SQL standard in DDL part forever, especially as there are more and more requirements coming from different connectors and external systems. Best, Kurt On Wed, Sep 9, 2020 at 4:40 PM Timo Walther wrote

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Danny Chan
"offset INT SYSTEM_METADATA("offset")" This is actually Oracle or MySQL style computed column syntax. "You are right that one could argue that "timestamp", "headers" are something like "key" and "value"" I have the same feeling, both key value and headers timestamp are *real* data stored in the

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
Hi Danny, This is not Oracle and MySQL computed column syntax, because there is no "AS" after the type. Hi everyone, If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink schema problem. Personally, I t

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Leonard Xu
Hi everyone, I’m +1 for "offset INT SYSTEM_METADATA("offset”)” if we have to make a choice. It’s not a generated column syntax and thus we can get rid of the limitation of generated column. About distinguishing the read-only metadata and writeable metadata, I prefer to add keyword after SYSTE

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther
Hi everyone, "key" and "value" in the properties are a special case because they need to configure a format. So key and value are more than just metadata. Jark's example for setting a timestamp would work but as the FLIP discusses, we have way more metadata fields like headers, epoch-leader,

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Dawid Wysakowicz
Hi, Sorry for joining so late. First of all, I don't want to distract the discussion, but I thought maybe my opinion could help a bit, but maybe it won't ;) The first observation I got is that I think everyone agrees we need a way distinguish the read-only from r/w columns. Is that correct? Seco

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Danny Chan
“Personally, I still like the computed column design more because it allows to have full flexibility to compute the final column” I have the same feeling, the non-standard syntax "timestamp INT SYSTEM_METADATA("ts")" is neither a computed column nor normal column. It looks very likely a computed c

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther
+1 for: timestamp INT METADATA [FROM 'my-timestamp-field'] However, I would inverse the default. Because reading is more common than writing. Regards, Timo On 09.09.20 14:25, Danny Chan wrote: “Personally, I still like the computed column design more because it allows to have full flexibil

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Leonard Xu
Thanks @Dawid for the nice summary, I think you catch all opinions of the long discussion well. @Danny “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] Note that the "FROM 'field name'" is only needed when the name conflict with the declared table column name, when there are n

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Jark Wu
I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] Especially I like the shortcut: timestamp INT METADATA, this makes the most common case to be supported in the simplest way. I also think the default should be "PERSISTED", so VIRTUAL is optional when you

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-09 Thread Timo Walther
"If virtual by default, when a user types "timestamp int" ==> persisted column, then adds a "metadata" after that ==> virtual column, then adds a "persisted" after that ==> persisted column." Thanks for this nice mental model explanation, Jark. This makes total sense to me. Also making the the

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Timo Walther
Thanks everyone for this healthy discussion. I updated the FLIP with the outcome. I think the result is very powerful but also very easy to declare. Thanks for all the contributions. If there are no objections, I would continue with a voting. What do you think? Regards, Timo On 09.09.20 16:

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Danny Chan
Thanks for driving this Timo, +1 for voting ~ Best, Danny Chan 在 2020年9月10日 +0800 PM3:47,Timo Walther ,写道: > Thanks everyone for this healthy discussion. I updated the FLIP with the > outcome. I think the result is very powerful but also very easy to > declare. Thanks for all the contributions. >

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Jark Wu
Hi Timo, I have one minor suggestion. Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH LOCAL TIME ZONE`, because this is the type that users want to use, this can avoid unnecessary casting. Besides, currently, the bigint is casted to timestamp in seconds, so the implicit cast

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Kurt Young
The new syntax looks good to me. Best, Kurt On Thu, Sep 10, 2020 at 5:57 PM Jark Wu wrote: > Hi Timo, > > I have one minor suggestion. > Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH > LOCAL TIME ZONE`, because this is the type that users want to use, this can > avoid u

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Aljoscha Krettek
I've only been watching this from the sidelines but that latest proposal looks very good to me! Aljoscha On 10.09.20 12:20, Kurt Young wrote: The new syntax looks good to me. Best, Kurt On Thu, Sep 10, 2020 at 5:57 PM Jark Wu wrote: Hi Timo, I have one minor suggestion. Maybe the defaul

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Leonard Xu
Hi, Timo Thanks for the update I have a minor suggestion about the debezium metadata key, Could we use the original debezium key rather than import new key? debezium-json.schema => debezium-json.schema debezium-json.ingestion-timestamp => debezium-json.ts_ms debezium-j

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-10 Thread Timo Walther
Hi everyone, I had to add some last minute changes to FLIP-107. Jark pointed out that we might also discuss the implications to the `Schema` class from FLIP-129 as well as the LIKE clause from FLIP-110. I added another small section to the document. I just started a vote, if you have objecti

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-14 Thread Leonard Xu
Hi, Timo Thanks for your explanation, it makes sense to me. Best, Leonard >> Hi, Timo >> Thanks for the update >> I have a minor suggestion about the debezium metadata key, >> Could we use the original debezium key rather than import new key? >> debezium-json.schema

Re: [DISCUSS] FLIP-107: Reading table columns from different parts of source records

2020-09-16 Thread Jark Wu
Thanks Timo, The updates to `Schema` and LIKE clause looks good to me. Best, Jark On Tue, 15 Sep 2020 at 10:30, Leonard Xu wrote: > Hi, Timo > > Thanks for your explanation, it makes sense to me. > > Best, > Leonard > > > >> Hi, Timo > >> Thanks for the update > >> I have a minor suggestion ab