Hi, Timo Thanks for the update I have a minor suggestion about the debezium metadata key, Could we use the original debezium key rather than import new key?
debezium-json.schema => debezium-json.schema debezium-json.ingestion-timestamp => debezium-json.ts_ms debezium-json.source.database => debezium-json.source.db debezium-json.source.schema => debezium-json.source.schema debezium-json.source.table => debezium-json.source.table debezium-json.source.timestamp => debezium-json.source.ts_ms debezium-json.source.properties => debezium-json.source MAP<STRING, STRING> User who familiar with debezium will understand the key easier, and the key syntax is more json-path like. HDYT? The other part looks really good to me. Regards, Leonard > 在 2020年9月10日,18:26,Aljoscha Krettek <[email protected]> 写道: > > I've only been watching this from the sidelines but that latest proposal > looks very good to me! > > Aljoscha > > On 10.09.20 12:20, Kurt Young wrote: >> The new syntax looks good to me. >> Best, >> Kurt >> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <[email protected]> wrote: >>> Hi Timo, >>> >>> I have one minor suggestion. >>> Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH >>> LOCAL TIME ZONE`, because this is the type that users want to use, this can >>> avoid unnecessary casting. >>> Besides, currently, the bigint is casted to timestamp in seconds, so the >>> implicit cast may not work... >>> >>> I don't have other objections. But maybe we should wait for the >>> opinion from @Kurt for the new syntax. >>> >>> Best, >>> Jark >>> >>> >>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <[email protected]> wrote: >>> >>>> Thanks for driving this Timo, +1 for voting ~ >>>> >>>> Best, >>>> Danny Chan >>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <[email protected]>,写道: >>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the >>>>> outcome. I think the result is very powerful but also very easy to >>>>> declare. Thanks for all the contributions. >>>>> >>>>> If there are no objections, I would continue with a voting. >>>>> >>>>> What do you think? >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> On 09.09.20 16:52, Timo Walther wrote: >>>>>> "If virtual by default, when a user types "timestamp int" ==> >>>> persisted >>>>>> column, then adds a "metadata" after that ==> virtual column, then >>>> adds >>>>>> a "persisted" after that ==> persisted column." >>>>>> >>>>>> Thanks for this nice mental model explanation, Jark. This makes total >>>>>> sense to me. Also making the the most common case as short at just >>>>>> adding `METADATA` is a very good idea. Thanks, Danny! >>>>>> >>>>>> Let me update the FLIP again with all these ideas. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> On 09.09.20 15:03, Jark Wu wrote: >>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM >>>>>>> 'my-timestamp-field'] [VIRTUAL] >>>>>>> Especially I like the shortcut: timestamp INT METADATA, this makes >>>> the >>>>>>> most >>>>>>> common case to be supported in the simplest way. >>>>>>> >>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is >>>> optional >>>>>>> when >>>>>>> you are accessing a read-only metadata. Because: >>>>>>> 1. The "timestamp INT METADATA" should be a normal column, because >>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a >>>> normal >>>>>>> column should be persisted. >>>>>>> If virtual by default, when a user types "timestamp int" ==> >>>>>>> persisted >>>>>>> column, then adds a "metadata" after that ==> virtual column, then >>>> adds a >>>>>>> "persisted" after that ==> persisted column. >>>>>>> I think this looks reversed several times and makes users >>>> confused. >>>>>>> Physical fields are also prefixed with "fieldName TYPE", so >>>> "timestamp >>>>>>> INT >>>>>>> METADATA" is persisted is very straightforward. >>>>>>> 2. From the collected user question [1], we can see that "timestamp" >>>>>>> is the >>>>>>> most common use case. "timestamp" is a read-write metadata. >>>> Persisted by >>>>>>> default doesn't break the reading behavior. >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869 >>>>>>> >>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <[email protected]> wrote: >>>>>>> >>>>>>>> Thanks @Dawid for the nice summary, I think you catch all >>>> opinions of >>>>>>>> the >>>>>>>> long discussion well. >>>>>>>> >>>>>>>> @Danny >>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] >>>>>>>> Note that the "FROM 'field name'" is only needed when the name >>>>>>>> conflict >>>>>>>> with the declared table column name, when there are no >>>> conflicts, >>>>>>>> we can >>>>>>>> simplify it to >>>>>>>> timestamp INT METADATA" >>>>>>>> >>>>>>>> I really like the proposal, there is no confusion with computed >>>>>>>> column any >>>>>>>> more, and it’s concise enough. >>>>>>>> >>>>>>>> >>>>>>>> @Timo @Dawid >>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing with >>>> SYSTEM >>>>>>>> makes it clearer that it comes magically from the system.” >>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA. >>>> Here I >>>>>>>> very much prefer the SYSTEM_ prefix.” >>>>>>>> >>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, >>>>>>>> First of all, the word `TIME` has broad meanings but the word >>>>>>>> `METADATA ` >>>>>>>> not, `METADATA ` has specific meaning, >>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but >>>>>>>> `SYSTEM_METADATA ` not. >>>>>>>> Personally, I like more simplify way,sometimes less is more. >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Leonard >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Timo Walther <[email protected]> 于2020年9月9日周三 下午6:41写道: >>>>>>>>> >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> "key" and "value" in the properties are a special case >>>> because they >>>>>>>>>> need >>>>>>>>>> to configure a format. So key and value are more than just >>>> metadata. >>>>>>>>>> Jark's example for setting a timestamp would work but as the >>>> FLIP >>>>>>>>>> discusses, we have way more metadata fields like headers, >>>>>>>>>> epoch-leader, >>>>>>>>>> etc. Having a property for all of this metadata would mess up >>>> the WITH >>>>>>>>>> section entirely. Furthermore, we also want to deal with >>>> metadata from >>>>>>>>>> the formats. Solving this through properties as well would >>>> further >>>>>>>>>> complicate the property design. >>>>>>>>>> >>>>>>>>>> Personally, I still like the computed column design more >>>> because it >>>>>>>>>> allows to have full flexibility to compute the final column: >>>>>>>>>> >>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS >>>>>>>> TIMESTAMP(3))) >>>>>>>>>> >>>>>>>>>> Instead of having a helper column and a real column in the >>>> table: >>>>>>>>>> >>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) >>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp) >>>>>>>>>> >>>>>>>>>> But I see that the discussion leans towards: >>>>>>>>>> >>>>>>>>>> timestamp INT SYSTEM_METADATA("ts") >>>>>>>>>> >>>>>>>>>> Which is fine with me. It is the shortest solution, because >>>> we don't >>>>>>>>>> need additional CAST. We can discuss the syntax, so that >>>> confusion >>>>>>>>>> with >>>>>>>>>> computed columns can be avoided. >>>>>>>>>> >>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts") >>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") >>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED >>>>>>>>>> >>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing >>>> with >>>>>>>>>> SYSTEM >>>>>>>>>> makes it clearer that it comes magically from the system. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Timo >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote: >>>>>>>>>>> Hi Danny, >>>>>>>>>>> >>>>>>>>>>> This is not Oracle and MySQL computed column syntax, >>>> because there is >>>>>>>> no >>>>>>>>>>> "AS" after the type. >>>>>>>>>>> >>>>>>>>>>> Hi everyone, >>>>>>>>>>> >>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")", >>>> then I >>>>>>>>>>> think >>>>>>>> we >>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword >>>> for >>>>>>>> query-sink >>>>>>>>>>> schema problem. >>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA" >>>> for >>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a >>>> system >>>>>>>>>> function >>>>>>>>>>> and confuse users this looks like a computed column. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jark >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan < >>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")" >>>>>>>>>>>> >>>>>>>>>>>> This is actually Oracle or MySQL style computed column >>>> syntax. >>>>>>>>>>>> >>>>>>>>>>>> "You are right that one could argue that "timestamp", >>>> "headers" are >>>>>>>>>>>> something like "key" and "value"" >>>>>>>>>>>> >>>>>>>>>>>> I have the same feeling, both key value and headers >>>> timestamp are >>>>>>>> *real* >>>>>>>>>>>> data >>>>>>>>>>>> stored in the consumed record, they are not computed or >>>> generated. >>>>>>>>>>>> >>>>>>>>>>>> "Trying to solve everything via properties sounds rather >>>> like a hack >>>>>>>> to >>>>>>>>>>>> me" >>>>>>>>>>>> >>>>>>>>>>>> Things are not that hack if we can unify the routines or >>>> the >>>>>>>> definitions >>>>>>>>>>>> (all from the computed column way or all from the table >>>> options), i >>>>>>>> also >>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax >>>> for >>>>>>>> different >>>>>>>>>>>> kinds of metadata (read-only and read-write). In this >>>> FLIP, we >>>>>>>>>>>> declare >>>>>>>>>> the >>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA >>>> for other >>>>>>>>>> metadata, >>>>>>>>>>>> that is a hacky thing or something in-consistent. >>>>>>>>>>>> >>>>>>>>>>>> Kurt Young <[email protected]> 于2020年9月9日周三 下午4:48写道: >>>>>>>>>>>> >>>>>>>>>>>>> I would vote for `offset INT >>>> SYSTEM_METADATA("offset")`. >>>>>>>>>>>>> >>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL >>>> part >>>>>>>>>>>>> forever, >>>>>>>>>>>>> especially as there are more and more >>>>>>>>>>>>> requirements coming from different connectors and >>>> external systems. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Kurt >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther < >>>> [email protected]> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>> >>>>>>>>>>>>>> now we are back at the original design proposed by >>>> Dawid :D >>>>>>>>>>>>>> Yes, we >>>>>>>>>>>>>> should be cautious about adding new syntax. But the >>>> length of this >>>>>>>>>>>>>> discussion shows that we are looking for a good >>>> long-term >>>>>>>>>>>>>> solution. >>>>>>>> In >>>>>>>>>>>>>> this case I would rather vote for a deep integration >>>> into the >>>>>>>> syntax. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Computed columns are also not SQL standard compliant. >>>> And our >>>>>>>>>>>>>> DDL is >>>>>>>>>>>>>> neither, so we have some degree of freedom here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Trying to solve everything via properties sounds >>>> rather like a >>>>>>>>>>>>>> hack >>>>>>>> to >>>>>>>>>>>>>> me. You are right that one could argue that >>>> "timestamp", "headers" >>>>>>>> are >>>>>>>>>>>>>> something like "key" and "value". However, mixing >>>>>>>>>>>>>> >>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")` >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>> `'timestamp.field' = 'ts'` >>>>>>>>>>>>>> >>>>>>>>>>>>>> looks more confusing to users that an explicit >>>>>>>>>>>>>> >>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` >>>>>>>>>>>>>> >>>>>>>>>>>>>> or >>>>>>>>>>>>>> >>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")` >>>>>>>>>>>>>> >>>>>>>>>>>>>> that is symetric for both source and sink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What do others think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Timo >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote: >>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think we have a conclusion that the writable >>>> metadata shouldn't >>>>>>>> be >>>>>>>>>>>>>>> defined as a computed column, but a normal column. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is >>>> one of the >>>>>>>>>>>>> approaches. >>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need >>>> to be cautious >>>>>>>>>>>>> enough >>>>>>>>>>>>>>> when adding new syntax. >>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or >>>> `VIRTUAL` >>>>>>>>>>>>>>> keyword >>>>>>>> to >>>>>>>>>>>>>>> resolve the query-sink schema problem if it is >>>> read-only >>>>>>>>>>>>>>> metadata. >>>>>>>>>>>> That >>>>>>>>>>>>>>> adds more stuff to learn for users. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> From my point of view, the "timestamp", >>>> "headers" are something >>>>>>>> like >>>>>>>>>>>>>> "key" >>>>>>>>>>>>>>> and "value" that stores with the real data. So why >>>> not define the >>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a >>>>>>>>>>>>>>> "timestamp.field" >>>>>>>>>>>>>>> connector option? >>>>>>>>>>>>>>> On the other side, the read-only metadata, such as >>>> "offset", >>>>>>>>>>>> shouldn't >>>>>>>>>>>>> be >>>>>>>>>>>>>>> defined as a normal column. So why not use the >>>> existing computed >>>>>>>>>>>> column >>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the >>>> query-sink >>>>>>>>>>>>>>> schema >>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>> So here is my proposal: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>> col1 STRING, >>>>>>>>>>>>>>> col2 STRING, >>>>>>>>>>>>>>> ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts >>>> is a normal >>>>>>>> field, >>>>>>>>>>>> so >>>>>>>>>>>>>> can >>>>>>>>>>>>>>> be read and written. >>>>>>>>>>>>>>> offset AS SYSTEM_METADATA("offset") >>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>> 'connector' = 'kafka', >>>>>>>>>>>>>>> 'topic' = 'test-topic', >>>>>>>>>>>>>>> 'key.fields' = 'id, name', >>>>>>>>>>>>>>> 'key.format' = 'csv', >>>>>>>>>>>>>>> 'value.format' = 'avro', >>>>>>>>>>>>>>> 'timestamp.field' = 'ts' -- define the >>>> mapping of Kafka >>>>>>>>>>>> timestamp >>>>>>>>>>>>>>> ); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> INSERT INTO kafka_table >>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM >>>> another_table; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think this can solve all the problems without >>>> introducing >>>>>>>>>>>>>>> any new >>>>>>>>>>>>>> syntax. >>>>>>>>>>>>>>> The only minor disadvantage is that we separate the >>>> definition >>>>>>>>>>>>> way/syntax >>>>>>>>>>>>>>> of read-only metadata and read-write fields. >>>>>>>>>>>>>>> However, I don't think this is a big problem. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther < >>>> [email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Kurt, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up >>>> for not reusing >>>>>>>>>>>>> computed >>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this >>>> syntax, @Jark >>>>>>>> are >>>>>>>>>>>>> you >>>>>>>>>>>>>>>> fine with this as well? The non-computed column >>>> approach was >>>>>>>>>>>>>>>> only >>>>>>>> a >>>>>>>>>>>>>>>> "slightly rejected alternative". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Furthermore, we would need to think about how >>>> such a new design >>>>>>>>>>>>>>>> influences the LIKE clause though. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED` >>>> keyword as it >>>>>>>>>>>> influences >>>>>>>>>>>>>>>> the query->sink schema. If you look at the list >>>> of metadata for >>>>>>>>>>>>> existing >>>>>>>>>>>>>>>> connectors and formats, we currently offer only >>>> two writable >>>>>>>>>>>> metadata >>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two >>>> tables >>>>>>>>>>>>>>>> whenever a >>>>>>>>>>>>>>>> metadata columns is read (one for the source, one >>>> for the sink). >>>>>>>>>>>> This >>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading >>>> the topic. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote: >>>>>>>>>>>>>>>>> I also share the concern that reusing the >>>> computed column >>>>>>>>>>>>>>>>> syntax >>>>>>>>>>>> but >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> different semantics >>>>>>>>>>>>>>>>> would confuse users a lot. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Besides, I think metadata fields are >>>> conceptually not the same >>>>>>>> with >>>>>>>>>>>>>>>>> computed columns. The metadata >>>>>>>>>>>>>>>>> field is a connector specific thing and it only >>>> contains the >>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>> that where does the field come >>>>>>>>>>>>>>>>> from (during source) or where does the field >>>> need to write to >>>>>>>>>>>> (during >>>>>>>>>>>>>>>>> sink). It's more similar with normal >>>>>>>>>>>>>>>>> fields, with assumption that all these fields >>>> need going to the >>>>>>>>>>>> data >>>>>>>>>>>>>>>> part. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative >>>> that Timo >>>>>>>> mentioned. >>>>>>>>>>>>>> And I >>>>>>>>>>>>>>>>> think we don't need the >>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be >>>> enough. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> During implementation, the framework only needs >>>> to pass such >>>>>>>>>>>> <field, >>>>>>>>>>>>>>>>> metadata field> information to the >>>>>>>>>>>>>>>>> connector, and the logic of handling such >>>> fields inside the >>>>>>>>>>>> connector >>>>>>>>>>>>>>>>> should be straightforward. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call >>>> UDFs or parse >>>>>>>>>>>> timestamps. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since >>>> the metadata >>>>>>>>>>>>>>>>> field >>>>>>>>>>>>> isn't >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> computed column anymore, we can support >>>>>>>>>>>>>>>>> referencing such fields in the computed column. >>>> For example: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>> timestamp STRING >>>> SYSTEM_METADATA("timestamp"), // >>>>>>>>>>>>>>>>> get the >>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>>> field from metadata >>>>>>>>>>>>>>>>> ts AS to_timestamp(timestamp) // normal >>>> computed >>>>>>>>>>>>>>>>> column, >>>>>>>>>>>> parse >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata >>>> field >>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther >>>>>>>>>>>>>>>>> <[email protected] >>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Leonard, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> the only alternative I see is that we >>>> introduce a concept that >>>>>>>> is >>>>>>>>>>>>>>>>>> completely different to computed columns. >>>> This is also >>>>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>> in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP. >>>> Something like: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>>> timestamp INT >>>> SYSTEM_METADATA("timestamp") PERSISTED, >>>>>>>>>>>>>>>>>> headers MAP<STRING, BYTES> >>>> SYSTEM_METADATA("headers") >>>>>>>>>>>>> PERSISTED >>>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and >>>> can easily map >>>>>>>>>>>> columns >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that >>>> users cannot call >>>>>>>> UDFs >>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done >>>> in a real >>>>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'm happy about better alternatives. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote: >>>>>>>>>>>>>>>>>>> HI, Timo >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing >>>> metadata via >>>>>>>>>>>>>> DynamicTableSink >>>>>>>>>>>>>>>>>> section: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( >>>>>>>>>>>>>>>>>>> id BIGINT, >>>>>>>>>>>>>>>>>>> name STRING, >>>>>>>>>>>>>>>>>>> timestamp AS >>>> CAST(SYSTEM_METADATA("timestamp") AS >>>>>>>>>>>>>>>>>>> BIGINT) >>>>>>>>>>>>>>>> PERSISTED, >>>>>>>>>>>>>>>>>>> headers AS >>>> CAST(SYSTEM_METADATA("headers") AS >>>>>>>>>>>>>>>>>>> MAP<STRING, >>>>>>>>>>>>>> BYTES>) >>>>>>>>>>>>>>>>>> PERSISTED >>>>>>>>>>>>>>>>>>> ) WITH ( >>>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> An insert statement could look like: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES ( >>>>>>>>>>>>>>>>>>> (1, "ABC", 1599133672, MAP('checksum', >>>>>>>>>>>> computeChecksum(...))) >>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make >>>> sense to me, >>>>>>>>>>>>>>>>>>> because it >>>>>>>>>>>>>>>> contains >>>>>>>>>>>>>>>>>> computed(generated) column. >>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow >>>> to insert >>>>>>>>>>>>>>>>>>> value to >>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> columns even they are persisted, this boke >>>> the generated >>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>> and may confuse user much. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For SQL server computed column[1]: >>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression >>>> [ PERSISTED [ NOT >>>>>>>>>>>> NULL ] >>>>>>>>>>>>>>>> ]... >>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the >>>> target of an INSERT or >>>>>>>>>>>>> UPDATE >>>>>>>>>>>>>>>>>> statement. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]: >>>>>>>>>>>>>>>>>>>> height_in numeric GENERATED ALWAYS >>>> AS (height_cm / >>>>>>>>>>>>>>>>>>>> 2.54) >>>>>>>>>>>>> STORED >>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be >>>> written to directly. In >>>>>>>>>>>> INSERT >>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified >>>> for a generated >>>>>>>>>>>> column, >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value >>>> for generated >>>>>>>> column >>>>>>>>>>>>>> after >>>>>>>>>>>>>>>>>> lookup the SQL 2016: >>>>>>>>>>>>>>>>>>>> <insert statement> ::= >>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert >>>> columns and source> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If <contextually typed table value >>>> constructor> CTTVC is >>>>>>>>>>>>> specified, >>>>>>>>>>>>>>>>>> then every <contextually typed row >>>>>>>>>>>>>>>>>>>> value constructor element> simply >>>> contained in CTTVC whose >>>>>>>>>>>>>>>> positionally >>>>>>>>>>>>>>>>>> corresponding <column name> >>>>>>>>>>>>>>>>>>>> in <insert column list> references a >>>> column of which some >>>>>>>>>>>>> underlying >>>>>>>>>>>>>>>>>> column is a generated column shall >>>>>>>>>>>>>>>>>>>> be a <default specification>. >>>>>>>>>>>>>>>>>>>> A <default specification> specifies the >>>> default value of >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>> associated item. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >>>>>>>> >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>> >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html >>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther < >>>> [email protected]> >>>>>>>>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's >>>> casting definition in >>>>>>>> [1][2] >>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be >>>> castable from BIGINT. >>>>>>>> If >>>>>>>>>>>>> not, >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> will make it possible ;-) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm aware of >>>> DeserializationSchema.getProducedType but I >>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> this method is actually misplaced. The type >>>> should rather be >>>>>>>>>>>> passed >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> source itself. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also >>>> not use this method >>>>>>>>>>>> because >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> Kafka source will add own metadata in >>>> addition to the >>>>>>>>>>>>>>>>>> DeserializationSchema. So >>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType >>>>>>>>>>>> will >>>>>>>>>>>>>>>> never >>>>>>>>>>>>>>>>>> be read. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the >>>> `DataType` from >>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also >>>> because the >>>>>>>>>>>>>>>>>> format's >>>>>>>>>>>>>> physical >>>>>>>>>>>>>>>>>> type is passed later in >>>> `createRuntimeDecoder`. If >>>>>>>>>>>>>>>>>> necessary, it >>>>>>>>>>>> can >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata >>>> types. We will >>>>>>>>>>>> provide >>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> metadata utility class for that. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 >>>>>>>> >>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 >>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote: >>>>>>>>>>>>>>>>>>>>> Hi Timo, >>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA >>>> behavior sounds good to >>>>>>>>>>>>>>>>>>>>> me. >>>>>>>> I >>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>> noticed >>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to >>>> "TIMESTAMP(3) WITH >>>>>>>>>>>>>>>>>>>>> LOCAL >>>>>>>>>>>> TIME >>>>>>>>>>>>>>>>>> ZONE". >>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or >>>> use "TIMESTAMP(3) WITH >>>>>>>>>>>> LOCAL >>>>>>>>>>>>>>>> TIME >>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka >>>> timestamp? I think this >>>>>>>>>>>> makes >>>>>>>>>>>>>>>> sense, >>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds >>>> since epoch. >>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema >>>> doesn't need TypeInfo", I >>>>>>>>>>>> don't >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> so. >>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements >>>> ResultTypeQueryable, >>>>>>>> thus >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> implementation needs to return an >>>> output TypeInfo. >>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also >>>>>>>>>>>>>>>>>>>>> calls >>>> DeserializationSchema.getProducedType as the produced >>>>>>>>>>>> type >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> source function [1]. >>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>> [1]: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 >>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo >>>> Walther < >>>>>>>> [email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope >>>> that I could address the >>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>>>>>>>>> concerns. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation. >>>> I wasn't aware that >>>>>>>>>>>> ts_ms >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different >>>> semantics. I updated the FLIP >>>>>>>> and >>>>>>>>>>>>>> expose >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> most commonly used properties >>>> separately. So frequently >>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp >>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp >>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database >>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema >>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> However, since other properties >>>> depend on the used >>>>>>>>>>>>>> connector/vendor, >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> remaining options are stored in: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> And accessed with: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS >>>>>>>>>>>>>>>> MAP<STRING, >>>>>>>>>>>>>>>>>>>>>> STRING>)['table'] >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to >>>> figure out the value and >>>>>>>>>>>> column >>>>>>>>>>>>>> type >>>>>>>>>>>>>>>>>>>>>> during validation. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing >>>> the CAST >>>>>>>>>>>>>>>>>>>>>> constraints. I >>>>>>>>>>>>> added >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA >>>> easier and avoid >>>>>>>> nested >>>>>>>>>>>>>>>> casting >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target >>>> data type: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> rowtime AS >>>> CAST(SYSTEM_METADATA("timestamp") AS >>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3) >>>>>>>>>>>>> WITH >>>>>>>>>>>>>>>>>> LOCAL >>>>>>>>>>>>>>>>>>>>>> TIME ZONE) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> A connector still produces and >>>> consumes the data type >>>>>>>> returned >>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will >>>> insert necessary >>>>>>>>>>>>>>>>>>>>>> explicit >>>>>>>>>>>>>> casts. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a >>>> CAST such that the >>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>> receives a valid data type when >>>> constructing the table >>>>>>>> schema. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why >>>>>>>>>>>> `DecodingFormat#applyReadableMetadata` >>>>>>>>>>>>>>>>>> needs a >>>>>>>>>>>>>>>>>>>>>> DataType argument." >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema >>>> doesn't need TypeInfo, it >>>>>>>> is >>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source >>>> that needs TypeInfo for >>>>>>>>>>>>>>>> serializing >>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And >>>> that's this is >>>>>>>>>>>>>>>>>>>>>> what we >>>>>>>>>>>>>> provide. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Danny: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns >>>> the NULL type by >>>>>>>> default” >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> We can also use some other means to >>>> represent an UNKNOWN >>>>>>>> data >>>>>>>>>>>>>> type. >>>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the >>>> NullType for it. The >>>>>>>>>>>> important >>>>>>>>>>>>>>>> part >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> that the final data type is known for >>>> the entire computed >>>>>>>>>>>>> column. >>>>>>>>>>>>>>>> As I >>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the >>>> suggested option b) >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>> would >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST >>>> should be enough and >>>>>>>>>>>> allows >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed >>>> column. Option b) >>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>> need >>>>>>>>>>>>>>>>>> parser >>>>>>>>>>>>>>>>>>>>>> changes. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: >>>>>>>>>>>>>>>>>>>>>>> Hi, Timo >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and >>>> update, I have only one >>>>>>>>>>>>> question >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> the latest FLIP. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING> >>>> DataType of key >>>>>>>>>>>>>>>>>> 'debezium-json.source', if >>>>>>>>>>>>>>>>>>>>>> user want to use the table name >>>> metadata, they need to >>>>>>>> write: >>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS >>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source') >>>>>>>>>>>>>> AS >>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table'] >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex >>>> for user, Could we >>>>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType >>>> as following? >>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS >>>>>>>>>>>>>>>>>> >>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS >>>>>>>>>>>>>>>>>>>>>> STRING), >>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS >>>>>>>>>>>>>>>>>>>>>> >>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS >>>>>>>> BIGINT), >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the >>>> expression, the mainly >>>>>>>> used >>>>>>>>>>>>>>>>>> metadata in >>>>>>>>>>>>>>>>>>>>>> changelog format may include >>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from >>>>>>>>>>>>>>>>>>>>>> my side, >>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at >>>> first version. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above >>>> four metadata, and I‘m >>>>>>>>>>>>> willing >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> take some subtasks in next >>>> development if necessary. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Debezium: >>>>>>>>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>> "before": null, >>>>>>>>>>>>>>>>>>>>>>> "after": { "id": >>>> 101,"name": "scooter"}, >>>>>>>>>>>>>>>>>>>>>>> "source": { >>>>>>>>>>>>>>>>>>>>>>> "db": >>>> "inventory", # 1. >>>>>>>>>>>>>>>>>>>>>>> database >>>>>>>>>>>> name >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> changelog belongs to. >>>>>>>>>>>>>>>>>>>>>>> "table": >>>> "products", # 2. >>>>>>>>>>>>>>>>>>>>>>> table name >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>> belongs to. >>>>>>>>>>>>>>>>>>>>>>> "ts_ms": >>>> 1589355504100, # 3. >>>>>>>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.: >>>> transaction time in >>>>>>>>>>>> database. >>>>>>>>>>>>>>>>>>>>>>> "connector": "mysql", >>>>>>>>>>>>>>>>>>>>>>> …. >>>>>>>>>>>>>>>>>>>>>>> }, >>>>>>>>>>>>>>>>>>>>>>> "ts_ms": >>>> 1589355606100, # 4. >>>>>>>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>> when >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> debezium >>>>>>>>>>>>>>>>>>>>>> processed the changelog. >>>>>>>>>>>>>>>>>>>>>>> "op": "c", >>>>>>>>>>>>>>>>>>>>>>> "transaction": null >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Canal: >>>>>>>>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>> "data": [{ "id": "102", >>>> "name": "car battery" }], >>>>>>>>>>>>>>>>>>>>>>> "database": >>>> "inventory", # 1. database >>>>>>>>>>>>>>>>>>>>>>> name the >>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>> belongs to. >>>>>>>>>>>>>>>>>>>>>>> "table": >>>> "products", # 2. table name the >>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>> belongs >>>>>>>>>>>>>>>>>>>>>> to. >>>>>>>>>>>>>>>>>>>>>>> "es": >>>> 1589374013000, # 3. execution >>>>>>>>>>>>>>>>>>>>>>> time of >>>>>>>>>>>> the >>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction >>>> time in database. >>>>>>>>>>>>>>>>>>>>>>> "ts": >>>> 1589374013680, # 4. timestamp >>>>>>>>>>>>>>>>>>>>>>> when the >>>>>>>>>>>>>> cannal >>>>>>>>>>>>>>>>>>>>>> processed the changelog. >>>>>>>>>>>>>>>>>>>>>>> "isDdl": false, >>>>>>>>>>>>>>>>>>>>>>> "mysqlType": {}, >>>>>>>>>>>>>>>>>>>>>>> .... >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Best >>>>>>>>>>>>>>>>>>>>>>> Leonard >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan >>>>>>>>>>>>>>>>>>>>>>>> <[email protected]> 写道: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~ >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty >>>> good shape, I have only 2 >>>>>>>>>>>>>> questions >>>>>>>>>>>>>>>>>> here: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1. >>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a >>>>>>>>>>>> valid >>>>>>>>>>>>>>>>>> read-only >>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be >>>> extracted by the >>>>>>>>>>>> planner.” >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the >>>> SQL-SERVER syntax here ? >>>>>>>>>>>>> Usually >>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>> expression return type can be >>>> inferred automatically. >>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>>> guess >>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function >>>> like SYSTEM_METADATA >>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>> not have a specific return type. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or >>>> MySQL syntax there ? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED >>>> ALWAYS] AS >>>>>>>>>>>>>>>>>>>>>>>> (expression) >>>>>>>>>>>>>>>> [VIRTUAL] >>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")` >>>> returns the NULL type by >>>>>>>>>>>>> default” >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The default type should not be >>>> NULL because only NULL >>>>>>>>>>>> literal >>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type >>>> if we do not know the >>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>> type in >>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the >>>> physical value can be any >>>>>>>> java >>>>>>>>>>>>>>>> object. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>> >>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1 >>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>> Danny Chan >>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo >>>> Walther >>>>>>>>>>>>>>>>>>>>>>>> <[email protected] >>>>>>>>>>>>>> ,写道: >>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107. >>>> It now covers the full >>>>>>>>>>>> story >>>>>>>>>>>>>> how >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> read >>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to >>>> connectors and formats. It >>>>>>>>>>>>> considers >>>>>>>>>>>>>>>>>> all of >>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely >>>> FLIP-95, FLIP-132 and >>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122. >>>>>>>> It >>>>>>>>>>>>>>>>>> introduces >>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED >>>> computed columns and leaves >>>>>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>>>>> for now. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your >>>> feedback. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>> Timo >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young >>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the >>>> value.fields-include more >>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal? >>>>>>>>>>>>> Like >>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY, >>>> EXCEPT_TIMESTAMP". >>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and >>>> EXCEPT_KEY_TIMESTAMP, >>>>>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>> can >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> config to >>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but >>>> keep key. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 >>>> PM Kurt Young < >>>>>>>>>>>> [email protected] >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of >>>> questions around key fields, >>>>>>>> actually >>>>>>>>>>>> I >>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to >>>> be focused on key fields >>>>>>>>>>>> first. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand >>>> the usage of >>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields". >>>>>>>> Is >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> option only >>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write >>>> operation? Because for >>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine >>>> how such options can be >>>>>>>>>>>> applied. I >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> expect >>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a >>>> SYSTEM_METADATA("key") >>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key >>>> to a normal field? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only >>>> valid in write >>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I >>>>>>>>>>>> want >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> propose we >>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to >>>> not introducing >>>>>>>>>>>> key.format.type >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I >>>> think a single "key.field" >>>>>>>> (not >>>>>>>>>>>>>>>> fields) >>>>>>>>>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF >>>> to calculate whatever key >>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to >>>> introduce "value.format.type" >>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the >>>> "value" prefix. Not every >>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>> has a >>>>>>>>>>>>>>>>>>>>>>>>>>> concept >>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old >>>> parameter "format.type" >>>>>>>>>>>> already >>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>> enough to >>>>>>>>>>>>>>>>>>>>>>>>>>> use. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at >>>> 10:40 PM Jark Wu < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata >>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing >>>> SupportsMetadata sounds good to me. >>>>>>>>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>> have >>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>> questions >>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this >>>> interface. >>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know >>>> what the expected return >>>>>>>> type >>>>>>>>>>>> of >>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>> metadata? >>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the >>>> metadata fields? Append to the >>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>> physical >>>>>>>>>>>>>>>>>>>>>>>>>>>> fields? >>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest >>>> to change the signature to >>>>>>>>>>>>>>>> `TableSource >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> appendMetadataFields(String[] metadataNames, >>>>>>>> DataType[] >>>>>>>>>>>>>>>>>>>>>> metadataTypes)` >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> SYSTEM_METADATA("partition") >>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA() >>>> function be used nested in a >>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how >>>> to specify the return >>>>>>>>>>>>>>>>>>>>>>>>>>>> type of >>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at >>>> 17:06, Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more >>>> on how the source would >>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> columns >>>>>>>>>>>>>>>>>>>>>> and I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly >>>> the same as regular >>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns. >>>>>>>> I >>>>>>>>>>>>> see >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on >>>> that in the FLIP as you >>>>>>>> asked, >>>>>>>>>>>>>> Jark. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with >>>> Danny on how we should do >>>>>>>> that. >>>>>>>>>>>>> One >>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would >>>> introduce is an >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface >>>> SupportsMetadata { >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean >>>> supportsMetadata(Set<String> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields); >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource >>>> generateMetadataFields(Set<String> >>>>>>>>>>>>>>>> metadataFields); >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source >>>> would have to declare/emit only >>>>>>>> the >>>>>>>>>>>>>>>>>> requested >>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In >>>> order not to clash with user >>>>>>>>>>>> defined >>>>>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>>>>>>>>> When >>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata >>>> field I would prepend the >>>>>>>> column >>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> __system_{property_name}. Therefore when requested >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> SYSTEM_METADATA("partition") the source would >>>>>>>>>>>>>>>>>>>>>>>>>>>>> append >>>>>>>> a >>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to >>>> the schema. This would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> never >>>>>>>>>>>>>> visible >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be >>>> used only for the subsequent >>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> columns. >>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to >>>> you, I will update the FLIP >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> description. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit >>>> type in computed columns >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with >>>> Danny. It is also the current >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>> of >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on >>>> computed column vs function >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with >>>> Danny. I also think those >>>>>>>>>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> orthogonal. I >>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED >>>> computed columns out of the >>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>> don't see >>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to >>>> the partitioning. I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put >>>>>>>>>>>>> both >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document. >>>> We can either partition on a >>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>> column or >>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a >>>> partioned by clause. I am fine with >>>>>>>>>>>>> leaving >>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in >>>> the first version if you >>>>>>>>>>>>>>>>>>>>>>>>>>>>> still >>>>>>>>>>>>> have >>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question >>>> Danny. It depends which >>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>> strategy >>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>> use. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH >>>> partitioning strategy I thought it >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>> work >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be >>>> N = MOD(expr, num). I am not >>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>> though if >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the >>>> PARTITIONS clause. Usually >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>> does >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions >>>> are already an intrinsic >>>>>>>>>>>> property >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g. >>>> for kafka we do not create >>>>>>>>>>>> topics, >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing >>>> pre-partitioned topic. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs >>>> timestamp.field vs >>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs >>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing >>>> it to timestamp.field to be >>>>>>>>>>>>>>>> consistent >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and >>>> key.fields. Actually that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>> also >>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>> initial >>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first >>>> draft I prepared. I changed it >>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> shorten >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00, >>>> Danny Chan wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for >>>> bringing up this discussion, I >>>>>>>> think >>>>>>>>>>>> it >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the >>>> metadata outputs from source >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is >>>> completely orthogonal, computed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>> push >>>>>>>>>>>>>>>>>> down is >>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this >>>> should not be a blocker but a >>>>>>>>>>>>>> promotion, >>>>>>>>>>>>>>>>>> if we >>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the >>>> computed column, there >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no >>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> do any >>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source >>>> node just emit the complete >>>>>>>> record >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared >>>> physical schema, then when >>>>>>>> generating >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> virtual >>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would >>>> extract the metadata info and >>>>>>>> output >>>>>>>>>>>> as >>>>>>>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of >>>> metadata column >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer >>>> explicit type instead of CAST, >>>>>>>>>>>> they >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>> symantic >>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though, >>>> explict type is more >>>>>>>>>>>> straight-forward >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> we can >>>>>>>>>>>>>>>>>>>>>>>>>>>> declare >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute >>>> there. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A: >>>> partitioning based on acomputed >>>>>>>> column >>>>>>>>>>>>> VS >>>>>>>>>>>>>>>>>> option >>>>>>>>>>>>>>>>>>>>>> B: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just >>>> a function >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From the FLIP, >>>> it seems that B's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is >>>>>>>>>>>>> just >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> strategy >>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the >>>> partiton column is not >>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in >>>>>>>>>>>> the >>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>> schema, >>>>>>>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when >>>> reading from that. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we >>>> do not need to generate the >>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the >>>> table(but insert into) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also >>>> mark the column as STORED when >>>>>>>> we >>>>>>>>>>>>> want >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> persist >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they >>>> are orthogonal, we can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>> both, i >>>>>>>>>>>>>>>>>> saw >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2] >>>> would suggest to also define the >>>>>>>>>>>>>>>> PARTITIONS >>>>>>>>>>>>>>>>>>>>>> num, and >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed >>>> under a "tablenamespace", >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is >>>> partition number N, where N = >>>>>>>>>>>>> MOD(expr, >>>>>>>>>>>>>>>>>> num), >>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton >>>> the record would persist ? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 >>>> PM6:16,Dawid Wysakowicz < >>>>>>>>>>>>>>>>>> [email protected] >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a >>>> section to discuss relation to >>>>>>>>>>>> FLIP-63 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also >>>> tried to somewhat keep >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy >>>>>>>> of >>>>>>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the >>>> key.format.type. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered >>>> exactly what you are suggesting >>>>>>>>>>>>>>>> (prefixing >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I >>>> should've put that into an >>>>>>>>>>>>>>>>>> Option/Rejected >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp, >>>> key.*, value.* are connector >>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>> Why I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not >>>> adding that prefix in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the >>>> properties in the WITH section are >>>>>>>>>>>>>> connector >>>>>>>>>>>>>>>>>>>>>>>>>>>> properties. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the >>>> end a connector property as >>>>>>>> some >>>>>>>>>>>> of >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> sources >>>>>>>>>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo. >>>> The benefit of not >>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the >>>>>>>>>>>>>> prefix >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> that it >>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit >>>> shorter. Imagine prefixing all >>>>>>>> the >>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go >>>> with FLINK-12557: >>>>>>>>>>>> elasticsearch): >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> elasticsearch.key.format.type: csv >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> elasticsearch.key.format.field: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> elasticsearch.key.format.delimiter: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> elasticsearch.key.format.*: .... >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with >>>> doing it though if this is a >>>>>>>> preferred >>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> community. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update >>>> the `value.fields.include` >>>>>>>>>>>> property. >>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include. >>>> Which I think you also >>>>>>>> suggested >>>>>>>>>>>> in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> comment, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs >>>> declaring output type of >>>>>>>> computed >>>>>>>>>>>>>>>> column. >>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use >>>> CAST, but declare a type >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an >>>>>>>>>>>>>>>>>> expression >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output >>>> type of SYSTEM_METADATA. The >>>>>>>> reason >>>>>>>>>>>>> is >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to >>>> implement e.g. filter push >>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs >>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>> working >>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the >>>> source, e.g. in case of Kafka's >>>>>>>>>>>>>> offset, i >>>>>>>>>>>>>>>>>>>>>> think it's >>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long >>>> rather than string. This >>>>>>>> could >>>>>>>>>>>>> let >>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>> push >>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g. >>>> offset > 12345 & offset < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. >>>>>>>>>>>>>>>>>> Otherwise we >>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down >>>> cast(offset, long) > 12345 && >>>>>>>>>>>>>> cast(offset, >>>>>>>>>>>>>>>>>> long) >>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we >>>> need to introduce the type for >>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>> columns >>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions >>>> that infer output type >>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on >>>>>>>>>>>>>> expected >>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>>>>>> type. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed >>>> column push down. Yes, >>>>>>>>>>>>>> SYSTEM_METADATA >>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to >>>> the source. If it is not >>>>>>>> possible >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> planner >>>>>>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know >>>> computed columns push down >>>>>>>> will >>>>>>>>>>>> be >>>>>>>>>>>>>>>> part >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the >>>> persisted computed column. I think >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>> completely >>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my >>>> current proposal you can also >>>>>>>>>>>> partition >>>>>>>>>>>>>> by >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference >>>> between using a udf in >>>>>>>>>>>> partitioned >>>>>>>>>>>>>> by >>>>>>>>>>>>>>>> vs >>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is >>>> that when you partition >>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a >>>>>>>>>>>>>> computed >>>>>>>>>>>>>>>>>>>>>> column >>>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also >>>> computed when reading the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. >>>>>>>> If >>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> use a >>>>>>>>>>>>>>>>>>>>>> udf in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the >>>> expression is computed only >>>>>>>>>>>> when >>>>>>>>>>>>>>>>>> inserting >>>>>>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers >>>> some of your questions. Looking >>>>>>>>>>>>> forward >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> further >>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020 >>>> 05:18, Jark Wu wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for >>>> starting such a great >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>> Reaing >>>>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part >>>> information from source is an important >>>>>>>>>>>>> feature >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I >>>> agree with the proposal of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my >>>> thoughts and comments here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use >>>> connector properties instead of >>>>>>>>>>>>> introducing >>>>>>>>>>>>>>>>>> HEADER >>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you >>>> mentioned in the FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already >>>> introduced PARTITIONED BY in >>>>>>>> FLIP-63. >>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to >>>> explain what's the relationship >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts >>>> conflict? Could INSERT >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION >>>>>>>>>>>> be >>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table >>>> in this FLIP? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently, >>>> properties are hierarchical in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>> SQL. >>>>>>>>>>>>>>>>>> Shall we >>>>>>>>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced >>>> properties more hierarchical? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, >>>> "timestamp" => >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>> "connector.timestamp"? >>>>>>>>>>>>>>>>>> (actually, I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp" >>>> which is another >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single >>>> "timestamp" in properties may mislead >>>>>>>> users >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> field >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime >>>> attribute. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some >>>> minor comments in the FLIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar >>>> 2020 at 22:30, Dawid Wysakowicz < >>>>>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to >>>> propose an improvement that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>>>>>> reading >>>>>>>>>>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from >>>> different parts of source records. >>>>>>>>>>>>> Besides >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> main >>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if >>>> not all of the sources) expose >>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a >>>> read-only metadata such as >>>>>>>> offset, >>>>>>>>>>>>>>>>>> ingestion >>>>>>>>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>>>>>>>>>>>> or a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write >>>> parts of the record that contain >>>>>>>>>>>> data >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different >>>> purposes (partitioning, >>>>>>>> compaction >>>>>>>>>>>>>> etc.), >>>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in >>>> Kafka. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make >>>> it possible to read and write >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>> from >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In >>>> this proposal I discuss reading >>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>>>>>> data, >>>>>>>>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness >>>> this proposal discusses also the >>>>>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking >>>> forward to your comments. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access >>>> the FLIP here: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >
