The new syntax looks good to me. Best, Kurt
On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote: > Hi Timo, > > I have one minor suggestion. > Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH > LOCAL TIME ZONE`, because this is the type that users want to use, this can > avoid unnecessary casting. > Besides, currently, the bigint is casted to timestamp in seconds, so the > implicit cast may not work... > > I don't have other objections. But maybe we should wait for the > opinion from @Kurt for the new syntax. > > Best, > Jark > > > On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote: > >> Thanks for driving this Timo, +1 for voting ~ >> >> Best, >> Danny Chan >> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道: >> > Thanks everyone for this healthy discussion. I updated the FLIP with the >> > outcome. I think the result is very powerful but also very easy to >> > declare. Thanks for all the contributions. >> > >> > If there are no objections, I would continue with a voting. >> > >> > What do you think? >> > >> > Regards, >> > Timo >> > >> > >> > On 09.09.20 16:52, Timo Walther wrote: >> > > "If virtual by default, when a user types "timestamp int" ==> >> persisted >> > > column, then adds a "metadata" after that ==> virtual column, then >> adds >> > > a "persisted" after that ==> persisted column." >> > > >> > > Thanks for this nice mental model explanation, Jark. This makes total >> > > sense to me. Also making the the most common case as short at just >> > > adding `METADATA` is a very good idea. Thanks, Danny! >> > > >> > > Let me update the FLIP again with all these ideas. >> > > >> > > Regards, >> > > Timo >> > > >> > > >> > > On 09.09.20 15:03, Jark Wu wrote: >> > > > I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM >> > > > 'my-timestamp-field'] [VIRTUAL] >> > > > Especially I like the shortcut: timestamp INT METADATA, this makes >> the >> > > > most >> > > > common case to be supported in the simplest way. >> > > > >> > > > I also think the default should be "PERSISTED", so VIRTUAL is >> optional >> > > > when >> > > > you are accessing a read-only metadata. Because: >> > > > 1. The "timestamp INT METADATA" should be a normal column, because >> > > > "METADATA" is just a modifier to indicate it is from metadata, a >> normal >> > > > column should be persisted. >> > > > If virtual by default, when a user types "timestamp int" ==> >> > > > persisted >> > > > column, then adds a "metadata" after that ==> virtual column, then >> adds a >> > > > "persisted" after that ==> persisted column. >> > > > I think this looks reversed several times and makes users >> confused. >> > > > Physical fields are also prefixed with "fieldName TYPE", so >> "timestamp >> > > > INT >> > > > METADATA" is persisted is very straightforward. >> > > > 2. From the collected user question [1], we can see that "timestamp" >> > > > is the >> > > > most common use case. "timestamp" is a read-write metadata. >> Persisted by >> > > > default doesn't break the reading behavior. >> > > > >> > > > Best, >> > > > Jark >> > > > >> > > > [1]: https://issues.apache.org/jira/browse/FLINK-15869 >> > > > >> > > > On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote: >> > > > >> > > > > Thanks @Dawid for the nice summary, I think you catch all >> opinions of >> > > > > the >> > > > > long discussion well. >> > > > > >> > > > > @Danny >> > > > > “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] >> > > > > Note that the "FROM 'field name'" is only needed when the name >> > > > > conflict >> > > > > with the declared table column name, when there are no >> conflicts, >> > > > > we can >> > > > > simplify it to >> > > > > timestamp INT METADATA" >> > > > > >> > > > > I really like the proposal, there is no confusion with computed >> > > > > column any >> > > > > more, and it’s concise enough. >> > > > > >> > > > > >> > > > > @Timo @Dawid >> > > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with >> SYSTEM >> > > > > makes it clearer that it comes magically from the system.” >> > > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA. >> Here I >> > > > > very much prefer the SYSTEM_ prefix.” >> > > > > >> > > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot, >> > > > > First of all, the word `TIME` has broad meanings but the word >> > > > > `METADATA ` >> > > > > not, `METADATA ` has specific meaning, >> > > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but >> > > > > `SYSTEM_METADATA ` not. >> > > > > Personally, I like more simplify way,sometimes less is more. >> > > > > >> > > > > >> > > > > Best, >> > > > > Leonard >> > > > > >> > > > > >> > > > > >> > > > > > >> > > > > > Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: >> > > > > > >> > > > > > > Hi everyone, >> > > > > > > >> > > > > > > "key" and "value" in the properties are a special case >> because they >> > > > > > > need >> > > > > > > to configure a format. So key and value are more than just >> metadata. >> > > > > > > Jark's example for setting a timestamp would work but as the >> FLIP >> > > > > > > discusses, we have way more metadata fields like headers, >> > > > > > > epoch-leader, >> > > > > > > etc. Having a property for all of this metadata would mess up >> the WITH >> > > > > > > section entirely. Furthermore, we also want to deal with >> metadata from >> > > > > > > the formats. Solving this through properties as well would >> further >> > > > > > > complicate the property design. >> > > > > > > >> > > > > > > Personally, I still like the computed column design more >> because it >> > > > > > > allows to have full flexibility to compute the final column: >> > > > > > > >> > > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS >> > > > > TIMESTAMP(3))) >> > > > > > > >> > > > > > > Instead of having a helper column and a real column in the >> table: >> > > > > > > >> > > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) >> > > > > > > realTimestamp AS adjustTimestamp(helperTimestamp) >> > > > > > > >> > > > > > > But I see that the discussion leans towards: >> > > > > > > >> > > > > > > timestamp INT SYSTEM_METADATA("ts") >> > > > > > > >> > > > > > > Which is fine with me. It is the shortest solution, because >> we don't >> > > > > > > need additional CAST. We can discuss the syntax, so that >> confusion >> > > > > > > with >> > > > > > > computed columns can be avoided. >> > > > > > > >> > > > > > > timestamp INT USING SYSTEM_METADATA("ts") >> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") >> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED >> > > > > > > >> > > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing >> with >> > > > > > > SYSTEM >> > > > > > > makes it clearer that it comes magically from the system. >> > > > > > > >> > > > > > > What do you think? >> > > > > > > >> > > > > > > Regards, >> > > > > > > Timo >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On 09.09.20 11:41, Jark Wu wrote: >> > > > > > > > Hi Danny, >> > > > > > > > >> > > > > > > > This is not Oracle and MySQL computed column syntax, >> because there is >> > > > > no >> > > > > > > > "AS" after the type. >> > > > > > > > >> > > > > > > > Hi everyone, >> > > > > > > > >> > > > > > > > If we want to use "offset INT SYSTEM_METADATA("offset")", >> then I >> > > > > > > > think >> > > > > we >> > > > > > > > must further discuss about "PERSISED" or "VIRTUAL" keyword >> for >> > > > > query-sink >> > > > > > > > schema problem. >> > > > > > > > Personally, I think we can use a shorter keyword "METADATA" >> for >> > > > > > > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a >> system >> > > > > > > function >> > > > > > > > and confuse users this looks like a computed column. >> > > > > > > > >> > > > > > > > >> > > > > > > > Best, >> > > > > > > > Jark >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan < >> danny0...@apache.org> wrote: >> > > > > > > > >> > > > > > > > > "offset INT SYSTEM_METADATA("offset")" >> > > > > > > > > >> > > > > > > > > This is actually Oracle or MySQL style computed column >> syntax. >> > > > > > > > > >> > > > > > > > > "You are right that one could argue that "timestamp", >> "headers" are >> > > > > > > > > something like "key" and "value"" >> > > > > > > > > >> > > > > > > > > I have the same feeling, both key value and headers >> timestamp are >> > > > > *real* >> > > > > > > > > data >> > > > > > > > > stored in the consumed record, they are not computed or >> generated. >> > > > > > > > > >> > > > > > > > > "Trying to solve everything via properties sounds rather >> like a hack >> > > > > to >> > > > > > > > > me" >> > > > > > > > > >> > > > > > > > > Things are not that hack if we can unify the routines or >> the >> > > > > definitions >> > > > > > > > > (all from the computed column way or all from the table >> options), i >> > > > > also >> > > > > > > > > think that it is a hacky that we mix in 2 kinds of syntax >> for >> > > > > different >> > > > > > > > > kinds of metadata (read-only and read-write). In this >> FLIP, we >> > > > > > > > > declare >> > > > > > > the >> > > > > > > > > Kafka key fields with table options but SYSTEM_METADATA >> for other >> > > > > > > metadata, >> > > > > > > > > that is a hacky thing or something in-consistent. >> > > > > > > > > >> > > > > > > > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: >> > > > > > > > > >> > > > > > > > > > I would vote for `offset INT >> SYSTEM_METADATA("offset")`. >> > > > > > > > > > >> > > > > > > > > > I don't think we can stick with the SQL standard in DDL >> part >> > > > > > > > > > forever, >> > > > > > > > > > especially as there are more and more >> > > > > > > > > > requirements coming from different connectors and >> external systems. >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Kurt >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther < >> twal...@apache.org> >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi Jark, >> > > > > > > > > > > >> > > > > > > > > > > now we are back at the original design proposed by >> Dawid :D >> > > > > > > > > > > Yes, we >> > > > > > > > > > > should be cautious about adding new syntax. But the >> length of this >> > > > > > > > > > > discussion shows that we are looking for a good >> long-term >> > > > > > > > > > > solution. >> > > > > In >> > > > > > > > > > > this case I would rather vote for a deep integration >> into the >> > > > > syntax. >> > > > > > > > > > > >> > > > > > > > > > > Computed columns are also not SQL standard compliant. >> And our >> > > > > > > > > > > DDL is >> > > > > > > > > > > neither, so we have some degree of freedom here. >> > > > > > > > > > > >> > > > > > > > > > > Trying to solve everything via properties sounds >> rather like a >> > > > > > > > > > > hack >> > > > > to >> > > > > > > > > > > me. You are right that one could argue that >> "timestamp", "headers" >> > > > > are >> > > > > > > > > > > something like "key" and "value". However, mixing >> > > > > > > > > > > >> > > > > > > > > > > `offset AS SYSTEM_METADATA("offset")` >> > > > > > > > > > > >> > > > > > > > > > > and >> > > > > > > > > > > >> > > > > > > > > > > `'timestamp.field' = 'ts'` >> > > > > > > > > > > >> > > > > > > > > > > looks more confusing to users that an explicit >> > > > > > > > > > > >> > > > > > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` >> > > > > > > > > > > >> > > > > > > > > > > or >> > > > > > > > > > > >> > > > > > > > > > > `offset INT SYSTEM_METADATA("offset")` >> > > > > > > > > > > >> > > > > > > > > > > that is symetric for both source and sink. >> > > > > > > > > > > >> > > > > > > > > > > What do others think? >> > > > > > > > > > > >> > > > > > > > > > > Regards, >> > > > > > > > > > > Timo >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote: >> > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > >> > > > > > > > > > > > I think we have a conclusion that the writable >> metadata shouldn't >> > > > > be >> > > > > > > > > > > > defined as a computed column, but a normal column. >> > > > > > > > > > > > >> > > > > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is >> one of the >> > > > > > > > > > approaches. >> > > > > > > > > > > > However, it is not SQL standard compliant, we need >> to be cautious >> > > > > > > > > > enough >> > > > > > > > > > > > when adding new syntax. >> > > > > > > > > > > > Besides, we have to introduce the `PERSISTED` or >> `VIRTUAL` >> > > > > > > > > > > > keyword >> > > > > to >> > > > > > > > > > > > resolve the query-sink schema problem if it is >> read-only >> > > > > > > > > > > > metadata. >> > > > > > > > > That >> > > > > > > > > > > > adds more stuff to learn for users. >> > > > > > > > > > > > >> > > > > > > > > > > > > From my point of view, the "timestamp", >> "headers" are something >> > > > > like >> > > > > > > > > > > "key" >> > > > > > > > > > > > and "value" that stores with the real data. So why >> not define the >> > > > > > > > > > > > "timestamp" in the same way with "key" by using a >> > > > > > > > > > > > "timestamp.field" >> > > > > > > > > > > > connector option? >> > > > > > > > > > > > On the other side, the read-only metadata, such as >> "offset", >> > > > > > > > > shouldn't >> > > > > > > > > > be >> > > > > > > > > > > > defined as a normal column. So why not use the >> existing computed >> > > > > > > > > column >> > > > > > > > > > > > syntax for such metadata? Then we don't have the >> query-sink >> > > > > > > > > > > > schema >> > > > > > > > > > > problem. >> > > > > > > > > > > > So here is my proposal: >> > > > > > > > > > > > >> > > > > > > > > > > > CREATE TABLE kafka_table ( >> > > > > > > > > > > > id BIGINT, >> > > > > > > > > > > > name STRING, >> > > > > > > > > > > > col1 STRING, >> > > > > > > > > > > > col2 STRING, >> > > > > > > > > > > > ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts >> is a normal >> > > > > field, >> > > > > > > > > so >> > > > > > > > > > > can >> > > > > > > > > > > > be read and written. >> > > > > > > > > > > > offset AS SYSTEM_METADATA("offset") >> > > > > > > > > > > > ) WITH ( >> > > > > > > > > > > > 'connector' = 'kafka', >> > > > > > > > > > > > 'topic' = 'test-topic', >> > > > > > > > > > > > 'key.fields' = 'id, name', >> > > > > > > > > > > > 'key.format' = 'csv', >> > > > > > > > > > > > 'value.format' = 'avro', >> > > > > > > > > > > > 'timestamp.field' = 'ts' -- define the >> mapping of Kafka >> > > > > > > > > timestamp >> > > > > > > > > > > > ); >> > > > > > > > > > > > >> > > > > > > > > > > > INSERT INTO kafka_table >> > > > > > > > > > > > SELECT id, name, col1, col2, rowtime FROM >> another_table; >> > > > > > > > > > > > >> > > > > > > > > > > > I think this can solve all the problems without >> introducing >> > > > > > > > > > > > any new >> > > > > > > > > > > syntax. >> > > > > > > > > > > > The only minor disadvantage is that we separate the >> definition >> > > > > > > > > > way/syntax >> > > > > > > > > > > > of read-only metadata and read-write fields. >> > > > > > > > > > > > However, I don't think this is a big problem. >> > > > > > > > > > > > >> > > > > > > > > > > > Best, >> > > > > > > > > > > > Jark >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther < >> twal...@apache.org> >> > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi Kurt, >> > > > > > > > > > > > > >> > > > > > > > > > > > > thanks for sharing your opinion. I'm totally up >> for not reusing >> > > > > > > > > > computed >> > > > > > > > > > > > > columns. I think Jark was a big supporter of this >> syntax, @Jark >> > > > > are >> > > > > > > > > > you >> > > > > > > > > > > > > fine with this as well? The non-computed column >> approach was >> > > > > > > > > > > > > only >> > > > > a >> > > > > > > > > > > > > "slightly rejected alternative". >> > > > > > > > > > > > > >> > > > > > > > > > > > > Furthermore, we would need to think about how >> such a new design >> > > > > > > > > > > > > influences the LIKE clause though. >> > > > > > > > > > > > > >> > > > > > > > > > > > > However, we should still keep the `PERSISTED` >> keyword as it >> > > > > > > > > influences >> > > > > > > > > > > > > the query->sink schema. If you look at the list >> of metadata for >> > > > > > > > > > existing >> > > > > > > > > > > > > connectors and formats, we currently offer only >> two writable >> > > > > > > > > metadata >> > > > > > > > > > > > > fields. Otherwise, one would need to declare two >> tables >> > > > > > > > > > > > > whenever a >> > > > > > > > > > > > > metadata columns is read (one for the source, one >> for the sink). >> > > > > > > > > This >> > > > > > > > > > > > > can be quite inconvientient e.g. for just reading >> the topic. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > Timo >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On 09.09.20 08:52, Kurt Young wrote: >> > > > > > > > > > > > > > I also share the concern that reusing the >> computed column >> > > > > > > > > > > > > > syntax >> > > > > > > > > but >> > > > > > > > > > > have >> > > > > > > > > > > > > > different semantics >> > > > > > > > > > > > > > would confuse users a lot. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Besides, I think metadata fields are >> conceptually not the same >> > > > > with >> > > > > > > > > > > > > > computed columns. The metadata >> > > > > > > > > > > > > > field is a connector specific thing and it only >> contains the >> > > > > > > > > > > information >> > > > > > > > > > > > > > that where does the field come >> > > > > > > > > > > > > > from (during source) or where does the field >> need to write to >> > > > > > > > > (during >> > > > > > > > > > > > > > sink). It's more similar with normal >> > > > > > > > > > > > > > fields, with assumption that all these fields >> need going to the >> > > > > > > > > data >> > > > > > > > > > > > > part. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thus I'm more lean to the rejected alternative >> that Timo >> > > > > mentioned. >> > > > > > > > > > > And I >> > > > > > > > > > > > > > think we don't need the >> > > > > > > > > > > > > > PERSISTED keyword, SYSTEM_METADATA should be >> enough. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > During implementation, the framework only needs >> to pass such >> > > > > > > > > <field, >> > > > > > > > > > > > > > metadata field> information to the >> > > > > > > > > > > > > > connector, and the logic of handling such >> fields inside the >> > > > > > > > > connector >> > > > > > > > > > > > > > should be straightforward. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Regarding the downside Timo mentioned: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > The disadvantage is that users cannot call >> UDFs or parse >> > > > > > > > > timestamps. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > I think this is fairly simple to solve. Since >> the metadata >> > > > > > > > > > > > > > field >> > > > > > > > > > isn't >> > > > > > > > > > > a >> > > > > > > > > > > > > > computed column anymore, we can support >> > > > > > > > > > > > > > referencing such fields in the computed column. >> For example: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > CREATE TABLE kafka_table ( >> > > > > > > > > > > > > > id BIGINT, >> > > > > > > > > > > > > > name STRING, >> > > > > > > > > > > > > > timestamp STRING >> SYSTEM_METADATA("timestamp"), // >> > > > > > > > > > > > > > get the >> > > > > > > > > > > > > timestamp >> > > > > > > > > > > > > > field from metadata >> > > > > > > > > > > > > > ts AS to_timestamp(timestamp) // normal >> computed >> > > > > > > > > > > > > > column, >> > > > > > > > > parse >> > > > > > > > > > > the >> > > > > > > > > > > > > > string to TIMESTAMP type by using the metadata >> field >> > > > > > > > > > > > > > ) WITH ( >> > > > > > > > > > > > > > ... >> > > > > > > > > > > > > > ) >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > Kurt >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther >> > > > > > > > > > > > > > <twal...@apache.org >> > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hi Leonard, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > the only alternative I see is that we >> introduce a concept that >> > > > > is >> > > > > > > > > > > > > > > completely different to computed columns. >> This is also >> > > > > > > > > > > > > > > mentioned >> > > > > > > > > in >> > > > > > > > > > > the >> > > > > > > > > > > > > > > rejected alternative section of the FLIP. >> Something like: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > CREATE TABLE kafka_table ( >> > > > > > > > > > > > > > > id BIGINT, >> > > > > > > > > > > > > > > name STRING, >> > > > > > > > > > > > > > > timestamp INT >> SYSTEM_METADATA("timestamp") PERSISTED, >> > > > > > > > > > > > > > > headers MAP<STRING, BYTES> >> SYSTEM_METADATA("headers") >> > > > > > > > > > PERSISTED >> > > > > > > > > > > > > > > ) WITH ( >> > > > > > > > > > > > > > > ... >> > > > > > > > > > > > > > > ) >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > This way we would avoid confusion at all and >> can easily map >> > > > > > > > > columns >> > > > > > > > > > to >> > > > > > > > > > > > > > > metadata columns. The disadvantage is that >> users cannot call >> > > > > UDFs >> > > > > > > > > or >> > > > > > > > > > > > > > > parse timestamps. This would need to be done >> in a real >> > > > > > > > > > > > > > > computed >> > > > > > > > > > > column. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I'm happy about better alternatives. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > > Timo >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On 08.09.20 15:37, Leonard Xu wrote: >> > > > > > > > > > > > > > > > HI, Timo >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks for driving this FLIP. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Sorry but I have a concern about Writing >> metadata via >> > > > > > > > > > > DynamicTableSink >> > > > > > > > > > > > > > > section: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > CREATE TABLE kafka_table ( >> > > > > > > > > > > > > > > > id BIGINT, >> > > > > > > > > > > > > > > > name STRING, >> > > > > > > > > > > > > > > > timestamp AS >> CAST(SYSTEM_METADATA("timestamp") AS >> > > > > > > > > > > > > > > > BIGINT) >> > > > > > > > > > > > > PERSISTED, >> > > > > > > > > > > > > > > > headers AS >> CAST(SYSTEM_METADATA("headers") AS >> > > > > > > > > > > > > > > > MAP<STRING, >> > > > > > > > > > > BYTES>) >> > > > > > > > > > > > > > > PERSISTED >> > > > > > > > > > > > > > > > ) WITH ( >> > > > > > > > > > > > > > > > ... >> > > > > > > > > > > > > > > > ) >> > > > > > > > > > > > > > > > An insert statement could look like: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > INSERT INTO kafka_table VALUES ( >> > > > > > > > > > > > > > > > (1, "ABC", 1599133672, MAP('checksum', >> > > > > > > > > computeChecksum(...))) >> > > > > > > > > > > > > > > > ) >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > The proposed INERT syntax does not make >> sense to me, >> > > > > > > > > > > > > > > > because it >> > > > > > > > > > > > > contains >> > > > > > > > > > > > > > > computed(generated) column. >> > > > > > > > > > > > > > > > Both SQL server and Postgresql do not allow >> to insert >> > > > > > > > > > > > > > > > value to >> > > > > > > > > > > computed >> > > > > > > > > > > > > > > columns even they are persisted, this boke >> the generated >> > > > > > > > > > > > > > > column >> > > > > > > > > > > > > semantics >> > > > > > > > > > > > > > > and may confuse user much. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > For SQL server computed column[1]: >> > > > > > > > > > > > > > > > > column_name AS computed_column_expression >> [ PERSISTED [ NOT >> > > > > > > > > NULL ] >> > > > > > > > > > > > > ]... >> > > > > > > > > > > > > > > > > NOTE: A computed column cannot be the >> target of an INSERT or >> > > > > > > > > > UPDATE >> > > > > > > > > > > > > > > statement. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > For Postgresql generated column[2]: >> > > > > > > > > > > > > > > > > height_in numeric GENERATED ALWAYS >> AS (height_cm / >> > > > > > > > > > > > > > > > > 2.54) >> > > > > > > > > > STORED >> > > > > > > > > > > > > > > > > NOTE: A generated column cannot be >> written to directly. In >> > > > > > > > > INSERT >> > > > > > > > > > or >> > > > > > > > > > > > > > > UPDATE commands, a value cannot be specified >> for a generated >> > > > > > > > > column, >> > > > > > > > > > > but >> > > > > > > > > > > > > > > the keyword DEFAULT may be specified. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > It shouldn't be allowed to set/update value >> for generated >> > > > > column >> > > > > > > > > > > after >> > > > > > > > > > > > > > > lookup the SQL 2016: >> > > > > > > > > > > > > > > > > <insert statement> ::= >> > > > > > > > > > > > > > > > > INSERT INTO <insertion target> <insert >> columns and source> >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > If <contextually typed table value >> constructor> CTTVC is >> > > > > > > > > > specified, >> > > > > > > > > > > > > > > then every <contextually typed row >> > > > > > > > > > > > > > > > > value constructor element> simply >> contained in CTTVC whose >> > > > > > > > > > > > > positionally >> > > > > > > > > > > > > > > corresponding <column name> >> > > > > > > > > > > > > > > > > in <insert column list> references a >> column of which some >> > > > > > > > > > underlying >> > > > > > > > > > > > > > > column is a generated column shall >> > > > > > > > > > > > > > > > > be a <default specification>. >> > > > > > > > > > > > > > > > > A <default specification> specifies the >> default value of >> > > > > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > associated item. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > [1] >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >> > > > > >> > > > > > > > > > > > > > > < >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 >> > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > [2] >> > > > > > > > > >> https://www.postgresql.org/docs/12/ddl-generated-columns.html >> > > > > > > > > > < >> > > > > > > > > > > > > > > >> https://www.postgresql.org/docs/12/ddl-generated-columns.html> >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > 在 2020年9月8日,17:31,Timo Walther < >> twal...@apache.org> >> > > > > > > > > > > > > > > > > 写道: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hi Jark, >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > according to Flink's and Calcite's >> casting definition in >> > > > > [1][2] >> > > > > > > > > > > > > > > TIMESTAMP WITH LOCAL TIME ZONE should be >> castable from BIGINT. >> > > > > If >> > > > > > > > > > not, >> > > > > > > > > > > > > we >> > > > > > > > > > > > > > > will make it possible ;-) >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > I'm aware of >> DeserializationSchema.getProducedType but I >> > > > > > > > > > > > > > > > > think >> > > > > > > > > > that >> > > > > > > > > > > > > > > this method is actually misplaced. The type >> should rather be >> > > > > > > > > passed >> > > > > > > > > > to >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > source itself. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > For our Kafka SQL source, we will also >> not use this method >> > > > > > > > > because >> > > > > > > > > > > the >> > > > > > > > > > > > > > > Kafka source will add own metadata in >> addition to the >> > > > > > > > > > > > > > > DeserializationSchema. So >> > > > > > > > > > > > > > > DeserializationSchema.getProducedType >> > > > > > > > > will >> > > > > > > > > > > > > never >> > > > > > > > > > > > > > > be read. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > For now I suggest to leave out the >> `DataType` from >> > > > > > > > > > > > > > > DecodingFormat.applyReadableMetadata. Also >> because the >> > > > > > > > > > > > > > > format's >> > > > > > > > > > > physical >> > > > > > > > > > > > > > > type is passed later in >> `createRuntimeDecoder`. If >> > > > > > > > > > > > > > > necessary, it >> > > > > > > > > can >> > > > > > > > > > > be >> > > > > > > > > > > > > > > computed manually by consumedType + metadata >> types. We will >> > > > > > > > > provide >> > > > > > > > > > a >> > > > > > > > > > > > > > > metadata utility class for that. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > > > > Timo >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > [1] >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 >> > > > > >> > > > > > > > > > > > > > > > > [2] >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 >> > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On 08.09.20 10:52, Jark Wu wrote: >> > > > > > > > > > > > > > > > > > Hi Timo, >> > > > > > > > > > > > > > > > > > The updated CAST SYSTEM_METADATA >> behavior sounds good to >> > > > > > > > > > > > > > > > > > me. >> > > > > I >> > > > > > > > > > just >> > > > > > > > > > > > > > > noticed >> > > > > > > > > > > > > > > > > > that a BIGINT can't be converted to >> "TIMESTAMP(3) WITH >> > > > > > > > > > > > > > > > > > LOCAL >> > > > > > > > > TIME >> > > > > > > > > > > > > > > ZONE". >> > > > > > > > > > > > > > > > > > So maybe we need to support this, or >> use "TIMESTAMP(3) WITH >> > > > > > > > > LOCAL >> > > > > > > > > > > > > TIME >> > > > > > > > > > > > > > > > > > ZONE" as the defined type of Kafka >> timestamp? I think this >> > > > > > > > > makes >> > > > > > > > > > > > > sense, >> > > > > > > > > > > > > > > > > > because it represents the milli-seconds >> since epoch. >> > > > > > > > > > > > > > > > > > Regarding "DeserializationSchema >> doesn't need TypeInfo", I >> > > > > > > > > don't >> > > > > > > > > > > > > think >> > > > > > > > > > > > > > > so. >> > > > > > > > > > > > > > > > > > The DeserializationSchema implements >> ResultTypeQueryable, >> > > > > thus >> > > > > > > > > > the >> > > > > > > > > > > > > > > > > > implementation needs to return an >> output TypeInfo. >> > > > > > > > > > > > > > > > > > Besides, FlinkKafkaConsumer also >> > > > > > > > > > > > > > > > > > calls >> DeserializationSchema.getProducedType as the produced >> > > > > > > > > type >> > > > > > > > > > of >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > source function [1]. >> > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > Jark >> > > > > > > > > > > > > > > > > > [1]: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 >> > > > > >> > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2020 at 16:35, Timo >> Walther < >> > > > > twal...@apache.org> >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I updated the FLIP again and hope >> that I could address the >> > > > > > > > > > > mentioned >> > > > > > > > > > > > > > > > > > > concerns. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > @Leonard: Thanks for the explanation. >> I wasn't aware that >> > > > > > > > > ts_ms >> > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > source.ts_ms have different >> semantics. I updated the FLIP >> > > > > and >> > > > > > > > > > > expose >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > most commonly used properties >> separately. So frequently >> > > > > > > > > > > > > > > > > > > used >> > > > > > > > > > > > > > > properties >> > > > > > > > > > > > > > > > > > > are not hidden in the MAP anymore: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > debezium-json.ingestion-timestamp >> > > > > > > > > > > > > > > > > > > debezium-json.source.timestamp >> > > > > > > > > > > > > > > > > > > debezium-json.source.database >> > > > > > > > > > > > > > > > > > > debezium-json.source.schema >> > > > > > > > > > > > > > > > > > > debezium-json.source.table >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > However, since other properties >> depend on the used >> > > > > > > > > > > connector/vendor, >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > remaining options are stored in: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > debezium-json.source.properties >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > And accessed with: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS >> > > > > > > > > > > > > MAP<STRING, >> > > > > > > > > > > > > > > > > > > STRING>)['table'] >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Otherwise it is not possible to >> figure out the value and >> > > > > > > > > column >> > > > > > > > > > > type >> > > > > > > > > > > > > > > > > > > during validation. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > @Jark: You convinced me in relaxing >> the CAST >> > > > > > > > > > > > > > > > > > > constraints. I >> > > > > > > > > > added >> > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > dedicacated sub-section to the FLIP: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > For making the use of SYSTEM_METADATA >> easier and avoid >> > > > > nested >> > > > > > > > > > > > > casting >> > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > allow explicit casting to a target >> data type: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > rowtime AS >> CAST(SYSTEM_METADATA("timestamp") AS >> > > > > > > > > > > > > > > > > > > TIMESTAMP(3) >> > > > > > > > > > WITH >> > > > > > > > > > > > > > > LOCAL >> > > > > > > > > > > > > > > > > > > TIME ZONE) >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > A connector still produces and >> consumes the data type >> > > > > returned >> > > > > > > > > > by >> > > > > > > > > > > > > > > > > > > `listMetadata()`. The planner will >> insert necessary >> > > > > > > > > > > > > > > > > > > explicit >> > > > > > > > > > > casts. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > In any case, the user must provide a >> CAST such that the >> > > > > > > > > computed >> > > > > > > > > > > > > > > column >> > > > > > > > > > > > > > > > > > > receives a valid data type when >> constructing the table >> > > > > schema. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > "I don't see a reason why >> > > > > > > > > `DecodingFormat#applyReadableMetadata` >> > > > > > > > > > > > > > > needs a >> > > > > > > > > > > > > > > > > > > DataType argument." >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Correct he DeserializationSchema >> doesn't need TypeInfo, it >> > > > > is >> > > > > > > > > > > always >> > > > > > > > > > > > > > > > > > > executed locally. It is the source >> that needs TypeInfo for >> > > > > > > > > > > > > serializing >> > > > > > > > > > > > > > > > > > > the record to the next operator. And >> that's this is >> > > > > > > > > > > > > > > > > > > what we >> > > > > > > > > > > provide. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > @Danny: >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > “SYSTEM_METADATA("offset")` returns >> the NULL type by >> > > > > default” >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > We can also use some other means to >> represent an UNKNOWN >> > > > > data >> > > > > > > > > > > type. >> > > > > > > > > > > > > In >> > > > > > > > > > > > > > > > > > > the Flink type system, we use the >> NullType for it. The >> > > > > > > > > important >> > > > > > > > > > > > > part >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > that the final data type is known for >> the entire computed >> > > > > > > > > > column. >> > > > > > > > > > > > > As I >> > > > > > > > > > > > > > > > > > > mentioned before, I would avoid the >> suggested option b) >> > > > > > > > > > > > > > > > > > > that >> > > > > > > > > > would >> > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > > similar to your suggestion. The CAST >> should be enough and >> > > > > > > > > allows >> > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > complex expressions in the computed >> column. Option b) >> > > > > > > > > > > > > > > > > > > would >> > > > > > > > > need >> > > > > > > > > > > > > > > parser >> > > > > > > > > > > > > > > > > > > changes. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > > > > > > Timo >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On 08.09.20 06:21, Leonard Xu wrote: >> > > > > > > > > > > > > > > > > > > > Hi, Timo >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Thanks for you explanation and >> update, I have only one >> > > > > > > > > > question >> > > > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > the latest FLIP. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > About the MAP<STRING, STRING> >> DataType of key >> > > > > > > > > > > > > > > 'debezium-json.source', if >> > > > > > > > > > > > > > > > > > > user want to use the table name >> metadata, they need to >> > > > > write: >> > > > > > > > > > > > > > > > > > > > tableName STRING AS >> > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source') >> > > > > > > > > > > AS >> > > > > > > > > > > > > > > > > > > MAP<STRING, STRING>)['table'] >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > the expression is a little complex >> for user, Could we >> > > > > > > > > > > > > > > > > > > > only >> > > > > > > > > > > support >> > > > > > > > > > > > > > > > > > > necessary metas with simple DataType >> as following? >> > > > > > > > > > > > > > > > > > > > tableName STRING AS >> > > > > > > > > > > > > > > >> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS >> > > > > > > > > > > > > > > > > > > STRING), >> > > > > > > > > > > > > > > > > > > > transactionTime LONG AS >> > > > > > > > > > > > > > > > > > > >> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS >> > > > > BIGINT), >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > In this way, we can simplify the >> expression, the mainly >> > > > > used >> > > > > > > > > > > > > > > metadata in >> > > > > > > > > > > > > > > > > > > changelog format may include >> > > > > > > > > > > > > > > 'database','table','source.ts_ms','ts_ms' from >> > > > > > > > > > > > > > > > > > > my side, >> > > > > > > > > > > > > > > > > > > > maybe we could only support them at >> first version. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Both Debezium and Canal have above >> four metadata, and I‘m >> > > > > > > > > > willing >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > take some subtasks in next >> development if necessary. >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Debezium: >> > > > > > > > > > > > > > > > > > > > { >> > > > > > > > > > > > > > > > > > > > "before": null, >> > > > > > > > > > > > > > > > > > > > "after": { "id": >> 101,"name": "scooter"}, >> > > > > > > > > > > > > > > > > > > > "source": { >> > > > > > > > > > > > > > > > > > > > "db": >> "inventory", # 1. >> > > > > > > > > > > > > > > > > > > > database >> > > > > > > > > name >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > changelog belongs to. >> > > > > > > > > > > > > > > > > > > > "table": >> "products", # 2. >> > > > > > > > > > > > > > > > > > > > table name >> > > > > > > > > the >> > > > > > > > > > > > > > > changelog >> > > > > > > > > > > > > > > > > > > belongs to. >> > > > > > > > > > > > > > > > > > > > "ts_ms": >> 1589355504100, # 3. >> > > > > > > > > > > > > > > > > > > > timestamp >> > > > > > > of >> > > > > > > > > > the >> > > > > > > > > > > > > > > change >> > > > > > > > > > > > > > > > > > > happened in database system, i.e.: >> transaction time in >> > > > > > > > > database. >> > > > > > > > > > > > > > > > > > > > "connector": "mysql", >> > > > > > > > > > > > > > > > > > > > …. >> > > > > > > > > > > > > > > > > > > > }, >> > > > > > > > > > > > > > > > > > > > "ts_ms": >> 1589355606100, # 4. >> > > > > > > > > > > > > > > > > > > > timestamp >> > > > > > > > > when >> > > > > > > > > > > the >> > > > > > > > > > > > > > > debezium >> > > > > > > > > > > > > > > > > > > processed the changelog. >> > > > > > > > > > > > > > > > > > > > "op": "c", >> > > > > > > > > > > > > > > > > > > > "transaction": null >> > > > > > > > > > > > > > > > > > > > } >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Canal: >> > > > > > > > > > > > > > > > > > > > { >> > > > > > > > > > > > > > > > > > > > "data": [{ "id": "102", >> "name": "car battery" }], >> > > > > > > > > > > > > > > > > > > > "database": >> "inventory", # 1. database >> > > > > > > > > > > > > > > > > > > > name the >> > > > > > > > > > > changelog >> > > > > > > > > > > > > > > > > > > belongs to. >> > > > > > > > > > > > > > > > > > > > "table": >> "products", # 2. table name the >> > > > > > > > > > changelog >> > > > > > > > > > > > > > > belongs >> > > > > > > > > > > > > > > > > > > to. >> > > > > > > > > > > > > > > > > > > > "es": >> 1589374013000, # 3. execution >> > > > > > > > > > > > > > > > > > > > time of >> > > > > > > > > the >> > > > > > > > > > > > > change >> > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > database system, i.e.: transaction >> time in database. >> > > > > > > > > > > > > > > > > > > > "ts": >> 1589374013680, # 4. timestamp >> > > > > > > > > > > > > > > > > > > > when the >> > > > > > > > > > > cannal >> > > > > > > > > > > > > > > > > > > processed the changelog. >> > > > > > > > > > > > > > > > > > > > "isDdl": false, >> > > > > > > > > > > > > > > > > > > > "mysqlType": {}, >> > > > > > > > > > > > > > > > > > > > .... >> > > > > > > > > > > > > > > > > > > > } >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Best >> > > > > > > > > > > > > > > > > > > > Leonard >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,11:57,Danny Chan >> > > > > > > > > > > > > > > > > > > > > <yuzhao....@gmail.com> 写道: >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > Thanks Timo ~ >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The FLIP was already in pretty >> good shape, I have only 2 >> > > > > > > > > > > questions >> > > > > > > > > > > > > > > here: >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > 1. >> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a >> > > > > > > > > valid >> > > > > > > > > > > > > > > read-only >> > > > > > > > > > > > > > > > > > > computed column for Kafka and can be >> extracted by the >> > > > > > > > > planner.” >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > What is the pros we follow the >> SQL-SERVER syntax here ? >> > > > > > > > > > Usually >> > > > > > > > > > > an >> > > > > > > > > > > > > > > > > > > expression return type can be >> inferred automatically. >> > > > > > > > > > > > > > > > > > > But I >> > > > > > > > > > guess >> > > > > > > > > > > > > > > > > > > SQL-SERVER does not have function >> like SYSTEM_METADATA >> > > > > > > > > > > > > > > > > > > which >> > > > > > > > > > > > > actually >> > > > > > > > > > > > > > > does >> > > > > > > > > > > > > > > > > > > not have a specific return type. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > And why not use the Oracle or >> MySQL syntax there ? >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > column_name [datatype] [GENERATED >> ALWAYS] AS >> > > > > > > > > > > > > > > > > > > > > (expression) >> > > > > > > > > > > > > [VIRTUAL] >> > > > > > > > > > > > > > > > > > > > > Which is more straight-forward. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > 2. “SYSTEM_METADATA("offset")` >> returns the NULL type by >> > > > > > > > > > default” >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The default type should not be >> NULL because only NULL >> > > > > > > > > literal >> > > > > > > > > > > does >> > > > > > > > > > > > > > > > > > > that. Usually we use ANY as the type >> if we do not know the >> > > > > > > > > > > specific >> > > > > > > > > > > > > > > type in >> > > > > > > > > > > > > > > > > > > the SQL context. ANY means the >> physical value can be any >> > > > > java >> > > > > > > > > > > > > object. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > [1] >> > > > > > > > > > >> https://oracle-base.com/articles/11g/virtual-columns-11gr1 >> > > > > > > > > > > > > > > > > > > > > [2] >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html >> > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > Danny Chan >> > > > > > > > > > > > > > > > > > > > > 在 2020年9月4日 +0800 PM4:48,Timo >> Walther >> > > > > > > > > > > > > > > > > > > > > <twal...@apache.org >> > > > > > > > > > > ,写道: >> > > > > > > > > > > > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > I completely reworked FLIP-107. >> It now covers the full >> > > > > > > > > story >> > > > > > > > > > > how >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > read >> > > > > > > > > > > > > > > > > > > > > > and write metadata from/to >> connectors and formats. It >> > > > > > > > > > considers >> > > > > > > > > > > > > > > all of >> > > > > > > > > > > > > > > > > > > > > > the latest FLIPs, namely >> FLIP-95, FLIP-132 and >> > > > > > > > > > > > > > > > > > > > > > FLIP-122. >> > > > > It >> > > > > > > > > > > > > > > introduces >> > > > > > > > > > > > > > > > > > > > > > the concept of PERSISTED >> computed columns and leaves >> > > > > > > > > > > > > > > > > > > > > > out >> > > > > > > > > > > > > > > partitioning >> > > > > > > > > > > > > > > > > > > > > > for now. >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > Looking forward to your >> feedback. >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > > > > > > > > > > Timo >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > On 04.03.20 09:45, Kurt Young >> wrote: >> > > > > > > > > > > > > > > > > > > > > > > Sorry, forgot one question. >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > 4. Can we make the >> value.fields-include more >> > > > > > > > > > > > > > > > > > > > > > > orthogonal? >> > > > > > > > > > Like >> > > > > > > > > > > > > one >> > > > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > > > > > > > > specify it as "EXCEPT_KEY, >> EXCEPT_TIMESTAMP". >> > > > > > > > > > > > > > > > > > > > > > > With current EXCEPT_KEY and >> EXCEPT_KEY_TIMESTAMP, >> > > > > > > > > > > > > > > > > > > > > > > users >> > > > > > > > > can >> > > > > > > > > > > not >> > > > > > > > > > > > > > > > > > > config to >> > > > > > > > > > > > > > > > > > > > > > > just ignore timestamp but >> keep key. >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > Kurt >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 4, 2020 at 4:42 >> PM Kurt Young < >> > > > > > > > > ykt...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > Hi Dawid, >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > I have a couple of >> questions around key fields, >> > > > > actually >> > > > > > > > > I >> > > > > > > > > > > also >> > > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > > > > > > > > other questions but want to >> be focused on key fields >> > > > > > > > > first. >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > 1. I don't fully understand >> the usage of >> > > > > > > > > > > > > > > > > > > > > > > > "key.fields". >> > > > > Is >> > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > option only >> > > > > > > > > > > > > > > > > > > > > > > > valid during write >> operation? Because for >> > > > > > > > > > > > > > > > > > > > > > > > reading, I can't imagine >> how such options can be >> > > > > > > > > applied. I >> > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > expect >> > > > > > > > > > > > > > > > > > > > > > > > that there might be a >> SYSTEM_METADATA("key") >> > > > > > > > > > > > > > > > > > > > > > > > to read and assign the key >> to a normal field? >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > 2. If "key.fields" is only >> valid in write >> > > > > > > > > > > > > > > > > > > > > > > > operation, I >> > > > > > > > > want >> > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > propose we >> > > > > > > > > > > > > > > > > > > > > > > > can simplify the options to >> not introducing >> > > > > > > > > key.format.type >> > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > > > > > > other related options. I >> think a single "key.field" >> > > > > (not >> > > > > > > > > > > > > fields) >> > > > > > > > > > > > > > > > > > > would be >> > > > > > > > > > > > > > > > > > > > > > > > enough, users can use UDF >> to calculate whatever key >> > > > > they >> > > > > > > > > > > > > > > > > > > > > > > > want before sink. >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > 3. Also I don't want to >> introduce "value.format.type" >> > > > > and >> > > > > > > > > > > > > > > > > > > > > > > > "value.format.xxx" with the >> "value" prefix. Not every >> > > > > > > > > > > connector >> > > > > > > > > > > > > > > has a >> > > > > > > > > > > > > > > > > > > > > > > > concept >> > > > > > > > > > > > > > > > > > > > > > > > of key and values. The old >> parameter "format.type" >> > > > > > > > > already >> > > > > > > > > > > good >> > > > > > > > > > > > > > > > > > > enough to >> > > > > > > > > > > > > > > > > > > > > > > > use. >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > Kurt >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 3, 2020 at >> 10:40 PM Jark Wu < >> > > > > > > > > imj...@gmail.com> >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid, >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > I have two more questions. >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > SupportsMetadata >> > > > > > > > > > > > > > > > > > > > > > > > > Introducing >> SupportsMetadata sounds good to me. >> > > > > > > > > > > > > > > > > > > > > > > > > But I >> > > > > > > > > have >> > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > > > questions >> > > > > > > > > > > > > > > > > > > > > > > > > regarding to this >> interface. >> > > > > > > > > > > > > > > > > > > > > > > > > 1) How do the source know >> what the expected return >> > > > > type >> > > > > > > > > of >> > > > > > > > > > > > > each >> > > > > > > > > > > > > > > > > > > metadata? >> > > > > > > > > > > > > > > > > > > > > > > > > 2) Where to put the >> metadata fields? Append to the >> > > > > > > > > > existing >> > > > > > > > > > > > > > > physical >> > > > > > > > > > > > > > > > > > > > > > > > > fields? >> > > > > > > > > > > > > > > > > > > > > > > > > If yes, I would suggest >> to change the signature to >> > > > > > > > > > > > > `TableSource >> > > > > > > > > > > > > > > > > > > > > > > > > >> appendMetadataFields(String[] metadataNames, >> > > > > DataType[] >> > > > > > > > > > > > > > > > > > > metadataTypes)` >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> SYSTEM_METADATA("partition") >> > > > > > > > > > > > > > > > > > > > > > > > > Can SYSTEM_METADATA() >> function be used nested in a >> > > > > > > > > > computed >> > > > > > > > > > > > > > > column >> > > > > > > > > > > > > > > > > > > > > > > > > expression? If yes, how >> to specify the return >> > > > > > > > > > > > > > > > > > > > > > > > > type of >> > > > > > > > > > > > > > > > > > > SYSTEM_METADATA? >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > > Jark >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 3 Mar 2020 at >> 17:06, Dawid Wysakowicz < >> > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> >> > > > > > > > > > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > Hi, >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > 1. I thought a bit more >> on how the source would >> > > > > > > > > > > > > > > > > > > > > > > > > > emit >> > > > > > > > > the >> > > > > > > > > > > > > > > columns >> > > > > > > > > > > > > > > > > > > and I >> > > > > > > > > > > > > > > > > > > > > > > > > > now see its not exactly >> the same as regular >> > > > > > > > > > > > > > > > > > > > > > > > > > columns. >> > > > > I >> > > > > > > > > > see >> > > > > > > > > > > a >> > > > > > > > > > > > > > > need >> > > > > > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > > > > > > > > elaborate a bit more on >> that in the FLIP as you >> > > > > asked, >> > > > > > > > > > > Jark. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > I do agree mostly with >> Danny on how we should do >> > > > > that. >> > > > > > > > > > One >> > > > > > > > > > > > > > > > > > > additional >> > > > > > > > > > > > > > > > > > > > > > > > > > things I would >> introduce is an >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > interface >> SupportsMetadata { >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > boolean >> supportsMetadata(Set<String> >> > > > > > > > > > > > > > > > > > > > > > > > > > metadataFields); >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > TableSource >> generateMetadataFields(Set<String> >> > > > > > > > > > > > > metadataFields); >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > } >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > This way the source >> would have to declare/emit only >> > > > > the >> > > > > > > > > > > > > > > requested >> > > > > > > > > > > > > > > > > > > > > > > > > > metadata fields. In >> order not to clash with user >> > > > > > > > > defined >> > > > > > > > > > > > > > > fields. >> > > > > > > > > > > > > > > > > > > When >> > > > > > > > > > > > > > > > > > > > > > > > > > emitting the metadata >> field I would prepend the >> > > > > column >> > > > > > > > > > name >> > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > > > > > > > > >> __system_{property_name}. Therefore when requested >> > > > > > > > > > > > > > > > > > > > > > > > > > >> SYSTEM_METADATA("partition") the source would >> > > > > > > > > > > > > > > > > > > > > > > > > > append >> > > > > a >> > > > > > > > > > > field >> > > > > > > > > > > > > > > > > > > > > > > > > > __system_partition to >> the schema. This would be >> > > > > > > > > > > > > > > > > > > > > > > > > > never >> > > > > > > > > > > visible >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > user as it would be >> used only for the subsequent >> > > > > > > > > computed >> > > > > > > > > > > > > > > columns. >> > > > > > > > > > > > > > > > > > > If >> > > > > > > > > > > > > > > > > > > > > > > > > > that makes sense to >> you, I will update the FLIP >> > > > > > > > > > > > > > > > > > > > > > > > > > with >> > > > > > > > > this >> > > > > > > > > > > > > > > > > > > description. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > 2. CAST vs explicit >> type in computed columns >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > Here I agree with >> Danny. It is also the current >> > > > > > > > > > > > > > > > > > > > > > > > > > state >> > > > > > > > > of >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > proposal. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > 3. Partitioning on >> computed column vs function >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > Here I also agree with >> Danny. I also think those >> > > > > > > > > > > > > > > > > > > > > > > > > > are >> > > > > > > > > > > > > > > orthogonal. I >> > > > > > > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > > > > > > > > leave out the STORED >> computed columns out of the >> > > > > > > > > > > discussion. >> > > > > > > > > > > > > I >> > > > > > > > > > > > > > > > > > > don't see >> > > > > > > > > > > > > > > > > > > > > > > > > > how do they relate to >> the partitioning. I >> > > > > > > > > > > > > > > > > > > > > > > > > > already put >> > > > > > > > > > both >> > > > > > > > > > > of >> > > > > > > > > > > > > > > those >> > > > > > > > > > > > > > > > > > > > > > > > > > cases in the document. >> We can either partition on a >> > > > > > > > > > > computed >> > > > > > > > > > > > > > > > > > > column or >> > > > > > > > > > > > > > > > > > > > > > > > > > use a udf in a >> partioned by clause. I am fine with >> > > > > > > > > > leaving >> > > > > > > > > > > > > out >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning by udf in >> the first version if you >> > > > > > > > > > > > > > > > > > > > > > > > > > still >> > > > > > > > > > have >> > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > > > > > > > > > concerns. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > As for your question >> Danny. It depends which >> > > > > > > > > partitioning >> > > > > > > > > > > > > > > strategy >> > > > > > > > > > > > > > > > > > > you >> > > > > > > > > > > > > > > > > > > > > > > > > use. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > For the HASH >> partitioning strategy I thought it >> > > > > > > > > > > > > > > > > > > > > > > > > > would >> > > > > > > > > > work >> > > > > > > > > > > as >> > > > > > > > > > > > > > > you >> > > > > > > > > > > > > > > > > > > > > > > > > > explained. It would be >> N = MOD(expr, num). I am not >> > > > > > > > > sure >> > > > > > > > > > > > > > > though if >> > > > > > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > > > > > > > > should introduce the >> PARTITIONS clause. Usually >> > > > > > > > > > > > > > > > > > > > > > > > > > Flink >> > > > > > > > > > does >> > > > > > > > > > > > > not >> > > > > > > > > > > > > > > own >> > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > data and the partitions >> are already an intrinsic >> > > > > > > > > property >> > > > > > > > > > > of >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > underlying source e.g. >> for kafka we do not create >> > > > > > > > > topics, >> > > > > > > > > > > but >> > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > just >> > > > > > > > > > > > > > > > > > > > > > > > > > describe pre-existing >> pre-partitioned topic. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > 4. timestamp vs >> timestamp.field vs >> > > > > > > > > > > > > > > > > > > > > > > > > > connector.field vs >> > > > > > > > > ... >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with changing >> it to timestamp.field to be >> > > > > > > > > > > > > consistent >> > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > > > > > > > > other value.fields and >> key.fields. Actually that >> > > > > > > > > > > > > > > > > > > > > > > > > > was >> > > > > > > > > also >> > > > > > > > > > > my >> > > > > > > > > > > > > > > > > > > initial >> > > > > > > > > > > > > > > > > > > > > > > > > > proposal in a first >> draft I prepared. I changed it >> > > > > > > > > > > afterwards >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > shorten >> > > > > > > > > > > > > > > > > > > > > > > > > > the key. >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > Dawid >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > On 03/03/2020 09:00, >> Danny Chan wrote: >> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for >> bringing up this discussion, I >> > > > > think >> > > > > > > > > it >> > > > > > > > > > > is >> > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > useful >> > > > > > > > > > > > > > > > > > > > > > > > > > feature ~ >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > About how the >> metadata outputs from source >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > I think it is >> completely orthogonal, computed >> > > > > > > > > > > > > > > > > > > > > > > > > > > column >> > > > > > > > > > push >> > > > > > > > > > > > > > > down is >> > > > > > > > > > > > > > > > > > > > > > > > > > another topic, this >> should not be a blocker but a >> > > > > > > > > > > promotion, >> > > > > > > > > > > > > > > if we >> > > > > > > > > > > > > > > > > > > do >> > > > > > > > > > > > > > > > > > > > > > > > > not >> > > > > > > > > > > > > > > > > > > > > > > > > > have any filters on the >> computed column, there >> > > > > > > > > > > > > > > > > > > > > > > > > > is no >> > > > > > > > > need >> > > > > > > > > > > to >> > > > > > > > > > > > > > > do any >> > > > > > > > > > > > > > > > > > > > > > > > > > pushings; the source >> node just emit the complete >> > > > > record >> > > > > > > > > > > with >> > > > > > > > > > > > > > > full >> > > > > > > > > > > > > > > > > > > > > > > > > metadata >> > > > > > > > > > > > > > > > > > > > > > > > > > with the declared >> physical schema, then when >> > > > > generating >> > > > > > > > > > the >> > > > > > > > > > > > > > > virtual >> > > > > > > > > > > > > > > > > > > > > > > > > > columns, we would >> extract the metadata info and >> > > > > output >> > > > > > > > > as >> > > > > > > > > > > > > full >> > > > > > > > > > > > > > > > > > > > > > > > > columns(with >> > > > > > > > > > > > > > > > > > > > > > > > > > full schema). >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > About the type of >> metadata column >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > Personally i prefer >> explicit type instead of CAST, >> > > > > > > > > they >> > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > > symantic >> > > > > > > > > > > > > > > > > > > > > > > > > > equivalent though, >> explict type is more >> > > > > > > > > straight-forward >> > > > > > > > > > > and >> > > > > > > > > > > > > > > we can >> > > > > > > > > > > > > > > > > > > > > > > > > declare >> > > > > > > > > > > > > > > > > > > > > > > > > > the nullable attribute >> there. >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > About option A: >> partitioning based on acomputed >> > > > > column >> > > > > > > > > > VS >> > > > > > > > > > > > > > > option >> > > > > > > > > > > > > > > > > > > B: >> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning with just >> a function >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > From the FLIP, >> it seems that B's >> > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning is >> > > > > > > > > > just >> > > > > > > > > > > a >> > > > > > > > > > > > > > > strategy >> > > > > > > > > > > > > > > > > > > when >> > > > > > > > > > > > > > > > > > > > > > > > > > writing data, the >> partiton column is not >> > > > > > > > > > > > > > > > > > > > > > > > > > included in >> > > > > > > > > the >> > > > > > > > > > > > > table >> > > > > > > > > > > > > > > > > > > schema, >> > > > > > > > > > > > > > > > > > > > > > > > > so >> > > > > > > > > > > > > > > > > > > > > > > > > > it's just useless when >> reading from that. >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > - Compared to A, we >> do not need to generate the >> > > > > > > > > > partition >> > > > > > > > > > > > > > > column >> > > > > > > > > > > > > > > > > > > when >> > > > > > > > > > > > > > > > > > > > > > > > > > selecting from the >> table(but insert into) >> > > > > > > > > > > > > > > > > > > > > > > > > > > - For A we can also >> mark the column as STORED when >> > > > > we >> > > > > > > > > > want >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > > > persist >> > > > > > > > > > > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > So in my opition they >> are orthogonal, we can >> > > > > > > > > > > > > > > > > > > > > > > > > > > support >> > > > > > > > > > > both, i >> > > > > > > > > > > > > > > saw >> > > > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > > > > > > > > MySQL/Oracle[1][2] >> would suggest to also define the >> > > > > > > > > > > > > PARTITIONS >> > > > > > > > > > > > > > > > > > > num, and >> > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > partitions are managed >> under a "tablenamespace", >> > > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > partition >> > > > > > > > > > > > > > > in >> > > > > > > > > > > > > > > > > > > which >> > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > record is stored is >> partition number N, where N = >> > > > > > > > > > MOD(expr, >> > > > > > > > > > > > > > > num), >> > > > > > > > > > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > > > > > > > your >> > > > > > > > > > > > > > > > > > > > > > > > > > design, which partiton >> the record would persist ? >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > [1] >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >> > > > > > > > > > > > > > > > > > > > > > > > > > > [2] >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > > > > Danny Chan >> > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年3月2日 +0800 >> PM6:16,Dawid Wysakowicz < >> > > > > > > > > > > > > > > dwysakow...@apache.org >> > > > > > > > > > > > > > > > > > > > > > > > > > ,写道: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 2 I added a >> section to discuss relation to >> > > > > > > > > FLIP-63 >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 3 Yes, I also >> tried to somewhat keep >> > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchy >> > > > > of >> > > > > > > > > > > > > > > properties. >> > > > > > > > > > > > > > > > > > > > > > > > > > Therefore you have the >> key.format.type. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > I also considered >> exactly what you are suggesting >> > > > > > > > > > > > > (prefixing >> > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > > > > > > > > connector or kafka). I >> should've put that into an >> > > > > > > > > > > > > > > Option/Rejected >> > > > > > > > > > > > > > > > > > > > > > > > > > alternatives. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree timestamp, >> key.*, value.* are connector >> > > > > > > > > > > properties. >> > > > > > > > > > > > > > > Why I >> > > > > > > > > > > > > > > > > > > > > > > > > > wanted to suggest not >> adding that prefix in the >> > > > > > > > > > > > > > > > > > > > > > > > > > first >> > > > > > > > > > > version >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > > > > > > > > actually all the >> properties in the WITH section are >> > > > > > > > > > > connector >> > > > > > > > > > > > > > > > > > > > > > > > > properties. >> > > > > > > > > > > > > > > > > > > > > > > > > > Even format is in the >> end a connector property as >> > > > > some >> > > > > > > > > of >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > sources >> > > > > > > > > > > > > > > > > > > > > > > > > might >> > > > > > > > > > > > > > > > > > > > > > > > > > not have a format, imo. >> The benefit of not >> > > > > > > > > > > > > > > > > > > > > > > > > > adding the >> > > > > > > > > > > prefix >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > that it >> > > > > > > > > > > > > > > > > > > > > > > > > > makes the keys a bit >> shorter. Imagine prefixing all >> > > > > the >> > > > > > > > > > > > > > > properties >> > > > > > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > > > > > > > > connector (or if we go >> with FLINK-12557: >> > > > > > > > > elasticsearch): >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> elasticsearch.key.format.type: csv >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> elasticsearch.key.format.field: .... >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> elasticsearch.key.format.delimiter: .... >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> elasticsearch.key.format.*: .... >> > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with >> doing it though if this is a >> > > > > preferred >> > > > > > > > > > > > > > > approach >> > > > > > > > > > > > > > > > > > > in the >> > > > > > > > > > > > > > > > > > > > > > > > > > community. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad in-line comments: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > I forgot to update >> the `value.fields.include` >> > > > > > > > > property. >> > > > > > > > > > > It >> > > > > > > > > > > > > > > > > > > should be >> > > > > > > > > > > > > > > > > > > > > > > > > > value.fields-include. >> Which I think you also >> > > > > suggested >> > > > > > > > > in >> > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > comment, >> > > > > > > > > > > > > > > > > > > > > > > > > > right? >> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the cast vs >> declaring output type of >> > > > > computed >> > > > > > > > > > > > > column. >> > > > > > > > > > > > > > > I >> > > > > > > > > > > > > > > > > > > think >> > > > > > > > > > > > > > > > > > > > > > > > > > it's better not to use >> CAST, but declare a type >> > > > > > > > > > > > > > > > > > > > > > > > > > of an >> > > > > > > > > > > > > > > expression >> > > > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > > > > > > > later >> > > > > > > > > > > > > > > > > > > > > > > > > > on infer the output >> type of SYSTEM_METADATA. The >> > > > > reason >> > > > > > > > > > is >> > > > > > > > > > > I >> > > > > > > > > > > > > > > think >> > > > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > > > > > > > way >> > > > > > > > > > > > > > > > > > > > > > > > > > it will be easier to >> implement e.g. filter push >> > > > > > > > > > > > > > > > > > > > > > > > > > downs >> > > > > > > > > > when >> > > > > > > > > > > > > > > working >> > > > > > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > native types of the >> source, e.g. in case of Kafka's >> > > > > > > > > > > offset, i >> > > > > > > > > > > > > > > > > > > think it's >> > > > > > > > > > > > > > > > > > > > > > > > > > better to pushdown long >> rather than string. This >> > > > > could >> > > > > > > > > > let >> > > > > > > > > > > us >> > > > > > > > > > > > > > > push >> > > > > > > > > > > > > > > > > > > > > > > > > > expression like e.g. >> offset > 12345 & offset < >> > > > > > > > > > > > > > > > > > > > > > > > > > 59382. >> > > > > > > > > > > > > > > Otherwise we >> > > > > > > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > > > > > > > > have to push down >> cast(offset, long) > 12345 && >> > > > > > > > > > > cast(offset, >> > > > > > > > > > > > > > > long) >> > > > > > > > > > > > > > > > > > > < >> > > > > > > > > > > > > > > > > > > > > > > > > 59382. >> > > > > > > > > > > > > > > > > > > > > > > > > > Moreover I think we >> need to introduce the type for >> > > > > > > > > > computed >> > > > > > > > > > > > > > > columns >> > > > > > > > > > > > > > > > > > > > > > > > > anyway >> > > > > > > > > > > > > > > > > > > > > > > > > > to support functions >> that infer output type >> > > > > > > > > > > > > > > > > > > > > > > > > > based on >> > > > > > > > > > > expected >> > > > > > > > > > > > > > > > > > > return >> > > > > > > > > > > > > > > > > > > > > > > > > type. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the computed >> column push down. Yes, >> > > > > > > > > > > SYSTEM_METADATA >> > > > > > > > > > > > > > > would >> > > > > > > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > > > > > > > > > > > to be pushed down to >> the source. If it is not >> > > > > possible >> > > > > > > > > > the >> > > > > > > > > > > > > > > planner >> > > > > > > > > > > > > > > > > > > > > > > > > should >> > > > > > > > > > > > > > > > > > > > > > > > > > fail. As far as I know >> computed columns push down >> > > > > will >> > > > > > > > > be >> > > > > > > > > > > > > part >> > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > source >> > > > > > > > > > > > > > > > > > > > > > > > > > rework, won't it? ;) >> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the >> persisted computed column. I think >> > > > > > > > > > > > > > > > > > > > > > > > > > > > it is >> > > > > > > > > > > > > > > completely >> > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal. In my >> current proposal you can also >> > > > > > > > > partition >> > > > > > > > > > > by >> > > > > > > > > > > > > a >> > > > > > > > > > > > > > > > > > > computed >> > > > > > > > > > > > > > > > > > > > > > > > > > column. The difference >> between using a udf in >> > > > > > > > > partitioned >> > > > > > > > > > > by >> > > > > > > > > > > > > vs >> > > > > > > > > > > > > > > > > > > > > > > > > partitioned >> > > > > > > > > > > > > > > > > > > > > > > > > > by a computed column is >> that when you partition >> > > > > > > > > > > > > > > > > > > > > > > > > > by a >> > > > > > > > > > > computed >> > > > > > > > > > > > > > > > > > > column >> > > > > > > > > > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > > > > > > > > column must be also >> computed when reading the >> > > > > > > > > > > > > > > > > > > > > > > > > > table. >> > > > > If >> > > > > > > > > > you >> > > > > > > > > > > > > > > use a >> > > > > > > > > > > > > > > > > > > udf in >> > > > > > > > > > > > > > > > > > > > > > > > > > the partitioned by, the >> expression is computed only >> > > > > > > > > when >> > > > > > > > > > > > > > > inserting >> > > > > > > > > > > > > > > > > > > into >> > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > table. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers >> some of your questions. Looking >> > > > > > > > > > forward >> > > > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > further >> > > > > > > > > > > > > > > > > > > > > > > > > > suggestions. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > On 02/03/2020 >> 05:18, Jark Wu wrote: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for >> starting such a great >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion. >> > > > > > > > > > > Reaing >> > > > > > > > > > > > > > > > > > > metadata >> > > > > > > > > > > > > > > > > > > > > > > > > and >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > key-part >> information from source is an important >> > > > > > > > > > feature >> > > > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > > > > > > > streaming >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > users. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, I >> agree with the proposal of the >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I will leave my >> thoughts and comments here: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) +1 to use >> connector properties instead of >> > > > > > > > > > introducing >> > > > > > > > > > > > > > > HEADER >> > > > > > > > > > > > > > > > > > > > > > > > > > keyword as >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > the reason you >> mentioned in the FLIP. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) we already >> introduced PARTITIONED BY in >> > > > > FLIP-63. >> > > > > > > > > > > Maybe >> > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > > should >> > > > > > > > > > > > > > > > > > > > > > > > > > add a >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > section to >> explain what's the relationship >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > between >> > > > > > > > > > them. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do their concepts >> conflict? Could INSERT >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITION >> > > > > > > > > be >> > > > > > > > > > > used >> > > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONED table >> in this FLIP? >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Currently, >> properties are hierarchical in >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Flink >> > > > > > > > > > SQL. >> > > > > > > > > > > > > > > Shall we >> > > > > > > > > > > > > > > > > > > > > > > > > make >> > > > > > > > > > > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > new introduced >> properties more hierarchical? >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > For example, >> "timestamp" => >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> "connector.timestamp"? >> > > > > > > > > > > > > > > (actually, I >> > > > > > > > > > > > > > > > > > > > > > > > > > prefer >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > "kafka.timestamp" >> which is another >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement for >> > > > > > > > > > > > > > > properties >> > > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557) >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > A single >> "timestamp" in properties may mislead >> > > > > users >> > > > > > > > > > > that >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > > > > > > field >> > > > > > > > > > > > > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > a rowtime >> attribute. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also left some >> minor comments in the FLIP. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jark >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar >> 2020 at 22:30, Dawid Wysakowicz < >> > > > > > > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to >> propose an improvement that >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > would >> > > > > > > > > > > enable >> > > > > > > > > > > > > > > > > > > reading >> > > > > > > > > > > > > > > > > > > > > > > > > table >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > columns from >> different parts of source records. >> > > > > > > > > > Besides >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > main >> > > > > > > > > > > > > > > > > > > > > > > > > > payload >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > majority (if >> not all of the sources) expose >> > > > > > > > > > additional >> > > > > > > > > > > > > > > > > > > > > > > > > information. It >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > can be simply a >> read-only metadata such as >> > > > > offset, >> > > > > > > > > > > > > > > ingestion >> > > > > > > > > > > > > > > > > > > time >> > > > > > > > > > > > > > > > > > > > > > > > > or a >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > read and write >> parts of the record that contain >> > > > > > > > > data >> > > > > > > > > > > but >> > > > > > > > > > > > > > > > > > > > > > > > > additionally >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > serve different >> purposes (partitioning, >> > > > > compaction >> > > > > > > > > > > etc.), >> > > > > > > > > > > > > > > e.g. >> > > > > > > > > > > > > > > > > > > key >> > > > > > > > > > > > > > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp in >> Kafka. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should make >> it possible to read and write >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data >> > > > > > > > > > from >> > > > > > > > > > > > > all >> > > > > > > > > > > > > > > of >> > > > > > > > > > > > > > > > > > > those >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > locations. In >> this proposal I discuss reading >> > > > > > > > > > > > > partitioning >> > > > > > > > > > > > > > > > > > > data, >> > > > > > > > > > > > > > > > > > > > > > > > > for >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > completeness >> this proposal discusses also the >> > > > > > > > > > > > > partitioning >> > > > > > > > > > > > > > > when >> > > > > > > > > > > > > > > > > > > > > > > > > > writing >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data out. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am looking >> forward to your comments. >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can access >> the FLIP here: >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > >> > > > > >> > > > >> > > >> > >> >