Hi, Timo

Thanks for the update
I have a minor suggestion about the debezium metadata key,
Could we use the original  debezium key rather than import new key?  

debezium-json.schema                       => debezium-json.schema 
debezium-json.ingestion-timestamp  =>  debezium-json.ts_ms
debezium-json.source.database       =>  debezium-json.source.db
debezium-json.source.schema         =>  debezium-json.source.schema
debezium-json.source.table              =>  debezium-json.source.table
debezium-json.source.timestamp     =>  debezium-json.source.ts_ms
debezium-json.source.properties      =>  debezium-json.source MAP<STRING, 
STRING>
 
User who familiar with debezium will understand the key easier,  and the key 
syntax is more json-path like. HDYT?


The other part looks really good to me.


Regards,
Leonard


> 在 2020年9月10日,18:26,Aljoscha Krettek <aljos...@apache.org> 写道:
> 
> I've only been watching this from the sidelines but that latest proposal 
> looks very good to me!
> 
> Aljoscha
> 
> On 10.09.20 12:20, Kurt Young wrote:
>> The new syntax looks good to me.
>> Best,
>> Kurt
>> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote:
>>> Hi Timo,
>>> 
>>> I have one minor suggestion.
>>> Maybe the default data type of `timestamp`  can be `TIMESTAMP(3) WITH
>>> LOCAL TIME ZONE`, because this is the type that users want to use, this can
>>> avoid unnecessary casting.
>>> Besides, currently, the bigint is casted to timestamp in seconds, so the
>>> implicit cast may not work...
>>> 
>>> I don't have other objections. But maybe we should wait for the
>>> opinion from @Kurt for the new syntax.
>>> 
>>> Best,
>>> Jark
>>> 
>>> 
>>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote:
>>> 
>>>> Thanks for driving this Timo, +1 for voting ~
>>>> 
>>>> Best,
>>>> Danny Chan
>>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>>>>> outcome. I think the result is very powerful but also very easy to
>>>>> declare. Thanks for all the contributions.
>>>>> 
>>>>> If there are no objections, I would continue with a voting.
>>>>> 
>>>>> What do you think?
>>>>> 
>>>>> Regards,
>>>>> Timo
>>>>> 
>>>>> 
>>>>> On 09.09.20 16:52, Timo Walther wrote:
>>>>>> "If virtual by default, when a user types "timestamp int" ==>
>>>> persisted
>>>>>> column, then adds a "metadata" after that ==> virtual column, then
>>>> adds
>>>>>> a "persisted" after that ==> persisted column."
>>>>>> 
>>>>>> Thanks for this nice mental model explanation, Jark. This makes total
>>>>>> sense to me. Also making the the most common case as short at just
>>>>>> adding `METADATA` is a very good idea. Thanks, Danny!
>>>>>> 
>>>>>> Let me update the FLIP again with all these ideas.
>>>>>> 
>>>>>> Regards,
>>>>>> Timo
>>>>>> 
>>>>>> 
>>>>>> On 09.09.20 15:03, Jark Wu wrote:
>>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
>>>>>>> 'my-timestamp-field'] [VIRTUAL]
>>>>>>> Especially I like the shortcut: timestamp INT METADATA, this makes
>>>> the
>>>>>>> most
>>>>>>> common case to be supported in the simplest way.
>>>>>>> 
>>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is
>>>> optional
>>>>>>> when
>>>>>>> you are accessing a read-only metadata. Because:
>>>>>>> 1. The "timestamp INT METADATA" should be a normal column, because
>>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a
>>>> normal
>>>>>>> column should be persisted.
>>>>>>>      If virtual by default, when a user types "timestamp int" ==>
>>>>>>> persisted
>>>>>>> column, then adds a "metadata" after that ==> virtual column, then
>>>> adds a
>>>>>>> "persisted" after that ==> persisted column.
>>>>>>>      I think this looks reversed several times and makes users
>>>> confused.
>>>>>>> Physical fields are also prefixed with "fieldName TYPE", so
>>>> "timestamp
>>>>>>> INT
>>>>>>> METADATA" is persisted is very straightforward.
>>>>>>> 2. From the collected user question [1], we can see that "timestamp"
>>>>>>> is the
>>>>>>> most common use case. "timestamp" is a read-write metadata.
>>>> Persisted by
>>>>>>> default doesn't break the reading behavior.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Jark
>>>>>>> 
>>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869
>>>>>>> 
>>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Thanks @Dawid for the nice summary, I think you catch all
>>>> opinions of
>>>>>>>> the
>>>>>>>> long discussion well.
>>>>>>>> 
>>>>>>>> @Danny
>>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
>>>>>>>>   Note that the "FROM 'field name'" is only needed when the name
>>>>>>>> conflict
>>>>>>>>   with the declared table column name, when there are no
>>>> conflicts,
>>>>>>>> we can
>>>>>>>> simplify it to
>>>>>>>>        timestamp INT METADATA"
>>>>>>>> 
>>>>>>>> I really like the proposal, there is no confusion with computed
>>>>>>>> column any
>>>>>>>> more,  and it’s concise enough.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> @Timo @Dawid
>>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing with
>>>> SYSTEM
>>>>>>>> makes it clearer that it comes magically from the system.”
>>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA.
>>>> Here I
>>>>>>>> very much prefer the SYSTEM_ prefix.”
>>>>>>>> 
>>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
>>>>>>>> First of all,  the word `TIME` has broad meanings but the word
>>>>>>>> `METADATA `
>>>>>>>> not,  `METADATA ` has specific meaning,
>>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
>>>>>>>> `SYSTEM_METADATA ` not.
>>>>>>>> Personally, I like more simplify way,sometimes  less is more.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Leonard
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
>>>>>>>>> 
>>>>>>>>>> Hi everyone,
>>>>>>>>>> 
>>>>>>>>>> "key" and "value" in the properties are a special case
>>>> because they
>>>>>>>>>> need
>>>>>>>>>> to configure a format. So key and value are more than just
>>>> metadata.
>>>>>>>>>> Jark's example for setting a timestamp would work but as the
>>>> FLIP
>>>>>>>>>> discusses, we have way more metadata fields like headers,
>>>>>>>>>> epoch-leader,
>>>>>>>>>> etc. Having a property for all of this metadata would mess up
>>>> the WITH
>>>>>>>>>> section entirely. Furthermore, we also want to deal with
>>>> metadata from
>>>>>>>>>> the formats. Solving this through properties as well would
>>>> further
>>>>>>>>>> complicate the property design.
>>>>>>>>>> 
>>>>>>>>>> Personally, I still like the computed column design more
>>>> because it
>>>>>>>>>> allows to have full flexibility to compute the final column:
>>>>>>>>>> 
>>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
>>>>>>>> TIMESTAMP(3)))
>>>>>>>>>> 
>>>>>>>>>> Instead of having a helper column and a real column in the
>>>> table:
>>>>>>>>>> 
>>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
>>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp)
>>>>>>>>>> 
>>>>>>>>>> But I see that the discussion leans towards:
>>>>>>>>>> 
>>>>>>>>>> timestamp INT SYSTEM_METADATA("ts")
>>>>>>>>>> 
>>>>>>>>>> Which is fine with me. It is the shortest solution, because
>>>> we don't
>>>>>>>>>> need additional CAST. We can discuss the syntax, so that
>>>> confusion
>>>>>>>>>> with
>>>>>>>>>> computed columns can be avoided.
>>>>>>>>>> 
>>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts")
>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts")
>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>>>>>>>>>> 
>>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing
>>>> with
>>>>>>>>>> SYSTEM
>>>>>>>>>> makes it clearer that it comes magically from the system.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote:
>>>>>>>>>>> Hi Danny,
>>>>>>>>>>> 
>>>>>>>>>>> This is not Oracle and MySQL computed column syntax,
>>>> because there is
>>>>>>>> no
>>>>>>>>>>> "AS" after the type.
>>>>>>>>>>> 
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>> 
>>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")",
>>>> then I
>>>>>>>>>>> think
>>>>>>>> we
>>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword
>>>> for
>>>>>>>> query-sink
>>>>>>>>>>> schema problem.
>>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA"
>>>> for
>>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a
>>>> system
>>>>>>>>>> function
>>>>>>>>>>> and confuse users this looks like a computed column.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan <
>>>> danny0...@apache.org> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")"
>>>>>>>>>>>> 
>>>>>>>>>>>> This is actually Oracle or MySQL style computed column
>>>> syntax.
>>>>>>>>>>>> 
>>>>>>>>>>>> "You are right that one could argue that "timestamp",
>>>> "headers" are
>>>>>>>>>>>> something like "key" and "value""
>>>>>>>>>>>> 
>>>>>>>>>>>> I have the same feeling, both key value and headers
>>>> timestamp are
>>>>>>>> *real*
>>>>>>>>>>>> data
>>>>>>>>>>>> stored in the consumed record, they are not computed or
>>>> generated.
>>>>>>>>>>>> 
>>>>>>>>>>>> "Trying to solve everything via properties sounds rather
>>>> like a hack
>>>>>>>> to
>>>>>>>>>>>> me"
>>>>>>>>>>>> 
>>>>>>>>>>>> Things are not that hack if we can unify the routines or
>>>> the
>>>>>>>> definitions
>>>>>>>>>>>> (all from the computed column way or all from the table
>>>> options), i
>>>>>>>> also
>>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax
>>>> for
>>>>>>>> different
>>>>>>>>>>>> kinds of metadata (read-only and read-write). In this
>>>> FLIP, we
>>>>>>>>>>>> declare
>>>>>>>>>> the
>>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA
>>>> for other
>>>>>>>>>> metadata,
>>>>>>>>>>>> that is a hacky thing or something in-consistent.
>>>>>>>>>>>> 
>>>>>>>>>>>> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
>>>>>>>>>>>> 
>>>>>>>>>>>>>   I would vote for `offset INT
>>>> SYSTEM_METADATA("offset")`.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL
>>>> part
>>>>>>>>>>>>> forever,
>>>>>>>>>>>>> especially as there are more and more
>>>>>>>>>>>>> requirements coming from different connectors and
>>>> external systems.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <
>>>> twal...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> now we are back at the original design proposed by
>>>> Dawid :D
>>>>>>>>>>>>>> Yes, we
>>>>>>>>>>>>>> should be cautious about adding new syntax. But the
>>>> length of this
>>>>>>>>>>>>>> discussion shows that we are looking for a good
>>>> long-term
>>>>>>>>>>>>>> solution.
>>>>>>>> In
>>>>>>>>>>>>>> this case I would rather vote for a deep integration
>>>> into the
>>>>>>>> syntax.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Computed columns are also not SQL standard compliant.
>>>> And our
>>>>>>>>>>>>>> DDL is
>>>>>>>>>>>>>> neither, so we have some degree of freedom here.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Trying to solve everything via properties sounds
>>>> rather like a
>>>>>>>>>>>>>> hack
>>>>>>>> to
>>>>>>>>>>>>>> me. You are right that one could argue that
>>>> "timestamp", "headers"
>>>>>>>> are
>>>>>>>>>>>>>> something like "key" and "value". However, mixing
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")`
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> `'timestamp.field' = 'ts'`
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> looks more confusing to users that an explicit
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")`
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that is symetric for both source and sink.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> What do others think?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote:
>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I think we have a conclusion that the writable
>>>> metadata shouldn't
>>>>>>>> be
>>>>>>>>>>>>>>> defined as a computed column, but a normal column.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is
>>>> one of the
>>>>>>>>>>>>> approaches.
>>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need
>>>> to be cautious
>>>>>>>>>>>>> enough
>>>>>>>>>>>>>>> when adding new syntax.
>>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or
>>>> `VIRTUAL`
>>>>>>>>>>>>>>> keyword
>>>>>>>> to
>>>>>>>>>>>>>>> resolve the query-sink schema problem if it is
>>>> read-only
>>>>>>>>>>>>>>> metadata.
>>>>>>>>>>>> That
>>>>>>>>>>>>>>> adds more stuff to learn for users.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>  From my point of view, the "timestamp",
>>>> "headers" are something
>>>>>>>> like
>>>>>>>>>>>>>> "key"
>>>>>>>>>>>>>>> and "value" that stores with the real data. So why
>>>> not define the
>>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a
>>>>>>>>>>>>>>> "timestamp.field"
>>>>>>>>>>>>>>> connector option?
>>>>>>>>>>>>>>> On the other side, the read-only metadata, such as
>>>> "offset",
>>>>>>>>>>>> shouldn't
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> defined as a normal column. So why not use the
>>>> existing computed
>>>>>>>>>>>> column
>>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the
>>>> query-sink
>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>> So here is my proposal:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>     id BIGINT,
>>>>>>>>>>>>>>>     name STRING,
>>>>>>>>>>>>>>>     col1 STRING,
>>>>>>>>>>>>>>>     col2 STRING,
>>>>>>>>>>>>>>>     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts
>>>> is a normal
>>>>>>>> field,
>>>>>>>>>>>> so
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>> be read and written.
>>>>>>>>>>>>>>>     offset AS SYSTEM_METADATA("offset")
>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>     'connector' = 'kafka',
>>>>>>>>>>>>>>>     'topic' = 'test-topic',
>>>>>>>>>>>>>>>     'key.fields' = 'id, name',
>>>>>>>>>>>>>>>     'key.format' = 'csv',
>>>>>>>>>>>>>>>     'value.format' = 'avro',
>>>>>>>>>>>>>>>     'timestamp.field' = 'ts'    -- define the
>>>> mapping of Kafka
>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>> );
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> INSERT INTO kafka_table
>>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM
>>>> another_table;
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I think this can solve all the problems without
>>>> introducing
>>>>>>>>>>>>>>> any new
>>>>>>>>>>>>>> syntax.
>>>>>>>>>>>>>>> The only minor disadvantage is that we separate the
>>>> definition
>>>>>>>>>>>>> way/syntax
>>>>>>>>>>>>>>> of read-only metadata and read-write fields.
>>>>>>>>>>>>>>> However, I don't think this is a big problem.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther <
>>>> twal...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Kurt,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up
>>>> for not reusing
>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this
>>>> syntax, @Jark
>>>>>>>> are
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> fine with this as well? The non-computed column
>>>> approach was
>>>>>>>>>>>>>>>> only
>>>>>>>> a
>>>>>>>>>>>>>>>> "slightly rejected alternative".
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Furthermore, we would need to think about how
>>>> such a new design
>>>>>>>>>>>>>>>> influences the LIKE clause though.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED`
>>>> keyword as it
>>>>>>>>>>>> influences
>>>>>>>>>>>>>>>> the query->sink schema. If you look at the list
>>>> of metadata for
>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>> connectors and formats, we currently offer only
>>>> two writable
>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two
>>>> tables
>>>>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>>>>> metadata columns is read (one for the source, one
>>>> for the sink).
>>>>>>>>>>>> This
>>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading
>>>> the topic.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote:
>>>>>>>>>>>>>>>>> I also share the concern that reusing the
>>>> computed column
>>>>>>>>>>>>>>>>> syntax
>>>>>>>>>>>> but
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> different semantics
>>>>>>>>>>>>>>>>> would confuse users a lot.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Besides, I think metadata fields are
>>>> conceptually not the same
>>>>>>>> with
>>>>>>>>>>>>>>>>> computed columns. The metadata
>>>>>>>>>>>>>>>>> field is a connector specific thing and it only
>>>> contains the
>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>> that where does the field come
>>>>>>>>>>>>>>>>> from (during source) or where does the field
>>>> need to write to
>>>>>>>>>>>> (during
>>>>>>>>>>>>>>>>> sink). It's more similar with normal
>>>>>>>>>>>>>>>>> fields, with assumption that all these fields
>>>> need going to the
>>>>>>>>>>>> data
>>>>>>>>>>>>>>>> part.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative
>>>> that Timo
>>>>>>>> mentioned.
>>>>>>>>>>>>>> And I
>>>>>>>>>>>>>>>>> think we don't need the
>>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be
>>>> enough.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> During implementation, the framework only needs
>>>> to pass such
>>>>>>>>>>>> <field,
>>>>>>>>>>>>>>>>> metadata field> information to the
>>>>>>>>>>>>>>>>> connector, and the logic of handling such
>>>> fields inside the
>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>> should be straightforward.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call
>>>> UDFs or parse
>>>>>>>>>>>> timestamps.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since
>>>> the metadata
>>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> computed column anymore, we can support
>>>>>>>>>>>>>>>>> referencing such fields in the computed column.
>>>> For example:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>         id BIGINT,
>>>>>>>>>>>>>>>>>         name STRING,
>>>>>>>>>>>>>>>>>         timestamp STRING
>>>> SYSTEM_METADATA("timestamp"),  //
>>>>>>>>>>>>>>>>> get the
>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>>> field from metadata
>>>>>>>>>>>>>>>>>         ts AS to_timestamp(timestamp) // normal
>>>> computed
>>>>>>>>>>>>>>>>> column,
>>>>>>>>>>>> parse
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata
>>>> field
>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>        ...
>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
>>>>>>>>>>>>>>>>> <twal...@apache.org
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Leonard,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> the only alternative I see is that we
>>>> introduce a concept that
>>>>>>>> is
>>>>>>>>>>>>>>>>>> completely different to computed columns.
>>>> This is also
>>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP.
>>>> Something like:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>>         id BIGINT,
>>>>>>>>>>>>>>>>>>         name STRING,
>>>>>>>>>>>>>>>>>>         timestamp INT
>>>> SYSTEM_METADATA("timestamp") PERSISTED,
>>>>>>>>>>>>>>>>>>         headers MAP<STRING, BYTES>
>>>> SYSTEM_METADATA("headers")
>>>>>>>>>>>>> PERSISTED
>>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>>        ...
>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and
>>>> can easily map
>>>>>>>>>>>> columns
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that
>>>> users cannot call
>>>>>>>> UDFs
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done
>>>> in a real
>>>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I'm happy about better alternatives.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote:
>>>>>>>>>>>>>>>>>>> HI, Timo
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing
>>>> metadata via
>>>>>>>>>>>>>> DynamicTableSink
>>>>>>>>>>>>>>>>>> section:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>>>>>>>>>>>>       id BIGINT,
>>>>>>>>>>>>>>>>>>>       name STRING,
>>>>>>>>>>>>>>>>>>>       timestamp AS
>>>> CAST(SYSTEM_METADATA("timestamp") AS
>>>>>>>>>>>>>>>>>>> BIGINT)
>>>>>>>>>>>>>>>> PERSISTED,
>>>>>>>>>>>>>>>>>>>       headers AS
>>>> CAST(SYSTEM_METADATA("headers") AS
>>>>>>>>>>>>>>>>>>> MAP<STRING,
>>>>>>>>>>>>>> BYTES>)
>>>>>>>>>>>>>>>>>> PERSISTED
>>>>>>>>>>>>>>>>>>> ) WITH (
>>>>>>>>>>>>>>>>>>>       ...
>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>> An insert statement could look like:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES (
>>>>>>>>>>>>>>>>>>>       (1, "ABC", 1599133672, MAP('checksum',
>>>>>>>>>>>> computeChecksum(...)))
>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make
>>>> sense to me,
>>>>>>>>>>>>>>>>>>> because it
>>>>>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>> computed(generated) column.
>>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow
>>>> to insert
>>>>>>>>>>>>>>>>>>> value to
>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> columns even they are persisted, this boke
>>>> the generated
>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>> and may confuse user much.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> For SQL server computed column[1]:
>>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression
>>>> [ PERSISTED [ NOT
>>>>>>>>>>>> NULL ]
>>>>>>>>>>>>>>>> ]...
>>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the
>>>> target of an INSERT or
>>>>>>>>>>>>> UPDATE
>>>>>>>>>>>>>>>>>> statement.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]:
>>>>>>>>>>>>>>>>>>>>      height_in numeric GENERATED ALWAYS
>>>> AS (height_cm /
>>>>>>>>>>>>>>>>>>>> 2.54)
>>>>>>>>>>>>> STORED
>>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be
>>>> written to directly. In
>>>>>>>>>>>> INSERT
>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified
>>>> for a generated
>>>>>>>>>>>> column,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value
>>>> for generated
>>>>>>>> column
>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>>>> lookup the SQL 2016:
>>>>>>>>>>>>>>>>>>>> <insert statement> ::=
>>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert
>>>> columns and source>
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If <contextually typed table value
>>>> constructor> CTTVC is
>>>>>>>>>>>>> specified,
>>>>>>>>>>>>>>>>>> then every <contextually typed row
>>>>>>>>>>>>>>>>>>>> value constructor element> simply
>>>> contained in CTTVC whose
>>>>>>>>>>>>>>>> positionally
>>>>>>>>>>>>>>>>>> corresponding <column name>
>>>>>>>>>>>>>>>>>>>> in <insert column list> references a
>>>> column of which some
>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>>>>>> column is a generated column shall
>>>>>>>>>>>>>>>>>>>> be a <default specification>.
>>>>>>>>>>>>>>>>>>>> A <default specification> specifies the
>>>> default value of
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> associated item.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>>>>>>>> 
>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>> 
>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html
>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>> 
>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther <
>>>> twal...@apache.org>
>>>>>>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's
>>>> casting definition in
>>>>>>>> [1][2]
>>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be
>>>> castable from BIGINT.
>>>>>>>> If
>>>>>>>>>>>>> not,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> will make it possible ;-)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I'm aware of
>>>> DeserializationSchema.getProducedType but I
>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> this method is actually misplaced. The type
>>>> should rather be
>>>>>>>>>>>> passed
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> source itself.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also
>>>> not use this method
>>>>>>>>>>>> because
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> Kafka source will add own metadata in
>>>> addition to the
>>>>>>>>>>>>>>>>>> DeserializationSchema. So
>>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType
>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>>>>>> be read.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the
>>>> `DataType` from
>>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also
>>>> because the
>>>>>>>>>>>>>>>>>> format's
>>>>>>>>>>>>>> physical
>>>>>>>>>>>>>>>>>> type is passed later in
>>>> `createRuntimeDecoder`. If
>>>>>>>>>>>>>>>>>> necessary, it
>>>>>>>>>>>> can
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata
>>>> types. We will
>>>>>>>>>>>> provide
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> metadata utility class for that.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote:
>>>>>>>>>>>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA
>>>> behavior sounds good to
>>>>>>>>>>>>>>>>>>>>> me.
>>>>>>>> I
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>> noticed
>>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to
>>>> "TIMESTAMP(3) WITH
>>>>>>>>>>>>>>>>>>>>> LOCAL
>>>>>>>>>>>> TIME
>>>>>>>>>>>>>>>>>> ZONE".
>>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or
>>>> use "TIMESTAMP(3) WITH
>>>>>>>>>>>> LOCAL
>>>>>>>>>>>>>>>> TIME
>>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka
>>>> timestamp? I think this
>>>>>>>>>>>> makes
>>>>>>>>>>>>>>>> sense,
>>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds
>>>> since epoch.
>>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema
>>>> doesn't need TypeInfo", I
>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>> so.
>>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements
>>>> ResultTypeQueryable,
>>>>>>>> thus
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> implementation needs to return an
>>>> output TypeInfo.
>>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also
>>>>>>>>>>>>>>>>>>>>> calls
>>>> DeserializationSchema.getProducedType as the produced
>>>>>>>>>>>> type
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> source function [1].
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>> [1]:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo
>>>> Walther <
>>>>>>>> twal...@apache.org>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope
>>>> that I could address the
>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>>>>>>> concerns.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation.
>>>> I wasn't aware that
>>>>>>>>>>>> ts_ms
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different
>>>> semantics. I updated the FLIP
>>>>>>>> and
>>>>>>>>>>>>>> expose
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> most commonly used properties
>>>> separately. So frequently
>>>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp
>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp
>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database
>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema
>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> However, since other properties
>>>> depend on the used
>>>>>>>>>>>>>> connector/vendor,
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> remaining options are stored in:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> And accessed with:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
>>>>>>>>>>>>>>>> MAP<STRING,
>>>>>>>>>>>>>>>>>>>>>> STRING>)['table']
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to
>>>> figure out the value and
>>>>>>>>>>>> column
>>>>>>>>>>>>>> type
>>>>>>>>>>>>>>>>>>>>>> during validation.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing
>>>> the CAST
>>>>>>>>>>>>>>>>>>>>>> constraints. I
>>>>>>>>>>>>> added
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA
>>>> easier and avoid
>>>>>>>> nested
>>>>>>>>>>>>>>>> casting
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target
>>>> data type:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> rowtime AS
>>>> CAST(SYSTEM_METADATA("timestamp") AS
>>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3)
>>>>>>>>>>>>> WITH
>>>>>>>>>>>>>>>>>> LOCAL
>>>>>>>>>>>>>>>>>>>>>> TIME ZONE)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> A connector still produces and
>>>> consumes the data type
>>>>>>>> returned
>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will
>>>> insert necessary
>>>>>>>>>>>>>>>>>>>>>> explicit
>>>>>>>>>>>>>> casts.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a
>>>> CAST such that the
>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>> receives a valid data type when
>>>> constructing the table
>>>>>>>> schema.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why
>>>>>>>>>>>> `DecodingFormat#applyReadableMetadata`
>>>>>>>>>>>>>>>>>> needs a
>>>>>>>>>>>>>>>>>>>>>> DataType argument."
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema
>>>> doesn't need TypeInfo, it
>>>>>>>> is
>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source
>>>> that needs TypeInfo for
>>>>>>>>>>>>>>>> serializing
>>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And
>>>> that's this is
>>>>>>>>>>>>>>>>>>>>>> what we
>>>>>>>>>>>>>> provide.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> @Danny:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns
>>>> the NULL type by
>>>>>>>> default”
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> We can also use some other means to
>>>> represent an UNKNOWN
>>>>>>>> data
>>>>>>>>>>>>>> type.
>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the
>>>> NullType for it. The
>>>>>>>>>>>> important
>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> that the final data type is known for
>>>> the entire computed
>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>> As I
>>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the
>>>> suggested option b)
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST
>>>> should be enough and
>>>>>>>>>>>> allows
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed
>>>> column. Option b)
>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>> parser
>>>>>>>>>>>>>>>>>>>>>> changes.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi, Timo
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and
>>>> update,  I have only one
>>>>>>>>>>>>> question
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> the latest FLIP.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING>
>>>> DataType of key
>>>>>>>>>>>>>>>>>> 'debezium-json.source', if
>>>>>>>>>>>>>>>>>>>>>> user want to use the table name
>>>> metadata, they need to
>>>>>>>> write:
>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
>>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source')
>>>>>>>>>>>>>> AS
>>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table']
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex
>>>> for user, Could we
>>>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType
>>>> as following?
>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
>>>>>>>>>>>>>>>>>> 
>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
>>>>>>>>>>>>>>>>>>>>>> STRING),
>>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS
>>>>>>>>>>>>>>>>>>>>>> 
>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS
>>>>>>>> BIGINT),
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the
>>>> expression, the mainly
>>>>>>>> used
>>>>>>>>>>>>>>>>>> metadata in
>>>>>>>>>>>>>>>>>>>>>> changelog format may include
>>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from
>>>>>>>>>>>>>>>>>>>>>> my side,
>>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at
>>>> first version.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above
>>>> four metadata, and I‘m
>>>>>>>>>>>>> willing
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> take some subtasks in next
>>>> development if necessary.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Debezium:
>>>>>>>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>        "before": null,
>>>>>>>>>>>>>>>>>>>>>>>        "after": {  "id":
>>>> 101,"name": "scooter"},
>>>>>>>>>>>>>>>>>>>>>>>        "source": {
>>>>>>>>>>>>>>>>>>>>>>>          "db":
>>>> "inventory",                  # 1.
>>>>>>>>>>>>>>>>>>>>>>> database
>>>>>>>>>>>> name
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> changelog belongs to.
>>>>>>>>>>>>>>>>>>>>>>>          "table":
>>>> "products",                # 2.
>>>>>>>>>>>>>>>>>>>>>>> table name
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>> belongs to.
>>>>>>>>>>>>>>>>>>>>>>>          "ts_ms":
>>>> 1589355504100,             # 3.
>>>>>>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.:
>>>> transaction time in
>>>>>>>>>>>> database.
>>>>>>>>>>>>>>>>>>>>>>>          "connector": "mysql",
>>>>>>>>>>>>>>>>>>>>>>>          ….
>>>>>>>>>>>>>>>>>>>>>>>        },
>>>>>>>>>>>>>>>>>>>>>>>        "ts_ms":
>>>> 1589355606100,              # 4.
>>>>>>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>> when
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> debezium
>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
>>>>>>>>>>>>>>>>>>>>>>>        "op": "c",
>>>>>>>>>>>>>>>>>>>>>>>        "transaction": null
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Canal:
>>>>>>>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>        "data": [{  "id": "102",
>>>> "name": "car battery" }],
>>>>>>>>>>>>>>>>>>>>>>>        "database":
>>>> "inventory",      # 1. database
>>>>>>>>>>>>>>>>>>>>>>> name the
>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>> belongs to.
>>>>>>>>>>>>>>>>>>>>>>>        "table":
>>>> "products",          # 2. table name the
>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>> belongs
>>>>>>>>>>>>>>>>>>>>>> to.
>>>>>>>>>>>>>>>>>>>>>>>        "es":
>>>> 1589374013000,          # 3. execution
>>>>>>>>>>>>>>>>>>>>>>> time of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction
>>>> time in database.
>>>>>>>>>>>>>>>>>>>>>>>        "ts":
>>>> 1589374013680,          # 4. timestamp
>>>>>>>>>>>>>>>>>>>>>>> when the
>>>>>>>>>>>>>> cannal
>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
>>>>>>>>>>>>>>>>>>>>>>>        "isDdl": false,
>>>>>>>>>>>>>>>>>>>>>>>        "mysqlType": {},
>>>>>>>>>>>>>>>>>>>>>>>        ....
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Best
>>>>>>>>>>>>>>>>>>>>>>> Leonard
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>> <yuzhao....@gmail.com> 写道:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty
>>>> good shape, I have only 2
>>>>>>>>>>>>>> questions
>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 1.
>>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
>>>>>>>>>>>> valid
>>>>>>>>>>>>>>>>>> read-only
>>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be
>>>> extracted by the
>>>>>>>>>>>> planner.”
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the
>>>> SQL-SERVER syntax here ?
>>>>>>>>>>>>> Usually
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>> expression return type can be
>>>> inferred automatically.
>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>>> guess
>>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function
>>>> like SYSTEM_METADATA
>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>> not have a specific return type.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or
>>>> MySQL syntax there ?
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED
>>>> ALWAYS] AS
>>>>>>>>>>>>>>>>>>>>>>>> (expression)
>>>>>>>>>>>>>>>> [VIRTUAL]
>>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")`
>>>> returns the NULL type by
>>>>>>>>>>>>> default”
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> The default type should not be
>>>> NULL because only NULL
>>>>>>>>>>>> literal
>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type
>>>> if we do not know the
>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>> type in
>>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the
>>>> physical value can be any
>>>>>>>> java
>>>>>>>>>>>>>>>> object.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> 
>>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1
>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo
>>>> Walther
>>>>>>>>>>>>>>>>>>>>>>>> <twal...@apache.org
>>>>>>>>>>>>>> ,写道:
>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107.
>>>> It now covers the full
>>>>>>>>>>>> story
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> read
>>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to
>>>> connectors and formats. It
>>>>>>>>>>>>> considers
>>>>>>>>>>>>>>>>>> all of
>>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely
>>>> FLIP-95, FLIP-132 and
>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122.
>>>>>>>> It
>>>>>>>>>>>>>>>>>> introduces
>>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED
>>>> computed columns and leaves
>>>>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>>>>> for now.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your
>>>> feedback.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young
>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the
>>>> value.fields-include more
>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal?
>>>>>>>>>>>>> Like
>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY,
>>>> EXCEPT_TIMESTAMP".
>>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and
>>>> EXCEPT_KEY_TIMESTAMP,
>>>>>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>> can
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> config to
>>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but
>>>> keep key.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42
>>>> PM Kurt Young <
>>>>>>>>>>>> ykt...@gmail.com
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid,
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of
>>>> questions around key fields,
>>>>>>>> actually
>>>>>>>>>>>> I
>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to
>>>> be focused on key fields
>>>>>>>>>>>> first.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand
>>>> the usage of
>>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields".
>>>>>>>> Is
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> option only
>>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write
>>>> operation? Because for
>>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine
>>>> how such options can be
>>>>>>>>>>>> applied. I
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> expect
>>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a
>>>> SYSTEM_METADATA("key")
>>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key
>>>> to a normal field?
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only
>>>> valid in write
>>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I
>>>>>>>>>>>> want
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> propose we
>>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to
>>>> not introducing
>>>>>>>>>>>> key.format.type
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I
>>>> think a single "key.field"
>>>>>>>> (not
>>>>>>>>>>>>>>>> fields)
>>>>>>>>>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF
>>>> to calculate whatever key
>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to
>>>> introduce "value.format.type"
>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the
>>>> "value" prefix. Not every
>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>> has a
>>>>>>>>>>>>>>>>>>>>>>>>>>> concept
>>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old
>>>> parameter "format.type"
>>>>>>>>>>>> already
>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>> enough to
>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at
>>>> 10:40 PM Jark Wu <
>>>>>>>>>>>> imj...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing
>>>> SupportsMetadata sounds good to me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>> questions
>>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this
>>>> interface.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know
>>>> what the expected return
>>>>>>>> type
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>> metadata?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the
>>>> metadata fields? Append to the
>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>> physical
>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest
>>>> to change the signature to
>>>>>>>>>>>>>>>> `TableSource
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> appendMetadataFields(String[] metadataNames,
>>>>>>>> DataType[]
>>>>>>>>>>>>>>>>>>>>>> metadataTypes)`
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> SYSTEM_METADATA("partition")
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA()
>>>> function be used nested in a
>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how
>>>> to specify the return
>>>>>>>>>>>>>>>>>>>>>>>>>>>> type of
>>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at
>>>> 17:06, Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more
>>>> on how the source would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>>>>>>>>> and I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly
>>>> the same as regular
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns.
>>>>>>>> I
>>>>>>>>>>>>> see
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on
>>>> that in the FLIP as you
>>>>>>>> asked,
>>>>>>>>>>>>>> Jark.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with
>>>> Danny on how we should do
>>>>>>>> that.
>>>>>>>>>>>>> One
>>>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would
>>>> introduce is an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface
>>>> SupportsMetadata {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean
>>>> supportsMetadata(Set<String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource
>>>> generateMetadataFields(Set<String>
>>>>>>>>>>>>>>>> metadataFields);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source
>>>> would have to declare/emit only
>>>>>>>> the
>>>>>>>>>>>>>>>>>> requested
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In
>>>> order not to clash with user
>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>>>>>>>>> When
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata
>>>> field I would prepend the
>>>>>>>> column
>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> __system_{property_name}. Therefore when requested
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> SYSTEM_METADATA("partition") the source would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> append
>>>>>>>> a
>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to
>>>> the schema. This would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> never
>>>>>>>>>>>>>> visible
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be
>>>> used only for the subsequent
>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> columns.
>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to
>>>> you, I will update the FLIP
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> description.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit
>>>> type in computed columns
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with
>>>> Danny. It is also the current
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on
>>>> computed column vs function
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with
>>>> Danny. I also think those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> orthogonal. I
>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED
>>>> computed columns out of the
>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> don't see
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to
>>>> the partitioning. I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put
>>>>>>>>>>>>> both
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document.
>>>> We can either partition on a
>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>> column or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a
>>>> partioned by clause. I am fine with
>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in
>>>> the first version if you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question
>>>> Danny. It depends which
>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>> strategy
>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH
>>>> partitioning strategy I thought it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>> work
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be
>>>> N = MOD(expr, num). I am not
>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>> though if
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the
>>>> PARTITIONS clause. Usually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions
>>>> are already an intrinsic
>>>>>>>>>>>> property
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g.
>>>> for kafka we do not create
>>>>>>>>>>>> topics,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing
>>>> pre-partitioned topic.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs
>>>> timestamp.field vs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs
>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing
>>>> it to timestamp.field to be
>>>>>>>>>>>>>>>> consistent
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and
>>>> key.fields. Actually that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>> also
>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first
>>>> draft I prepared. I changed it
>>>>>>>>>>>>>> afterwards
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> shorten
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00,
>>>> Danny Chan wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
>>>> bringing up this discussion, I
>>>>>>>> think
>>>>>>>>>>>> it
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the
>>>> metadata outputs from source
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is
>>>> completely orthogonal, computed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>> push
>>>>>>>>>>>>>>>>>> down is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this
>>>> should not be a blocker but a
>>>>>>>>>>>>>> promotion,
>>>>>>>>>>>>>>>>>> if we
>>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the
>>>> computed column, there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no
>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> do any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source
>>>> node just emit the complete
>>>>>>>> record
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared
>>>> physical schema, then when
>>>>>>>> generating
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> virtual
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would
>>>> extract the metadata info and
>>>>>>>> output
>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of
>>>> metadata column
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer
>>>> explicit type instead of CAST,
>>>>>>>>>>>> they
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>> symantic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though,
>>>> explict type is more
>>>>>>>>>>>> straight-forward
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> we can
>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute
>>>> there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A:
>>>> partitioning based on acomputed
>>>>>>>> column
>>>>>>>>>>>>> VS
>>>>>>>>>>>>>>>>>> option
>>>>>>>>>>>>>>>>>>>>>> B:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just
>>>> a function
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>      From the FLIP,
>>>> it seems that B's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is
>>>>>>>>>>>>> just
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> strategy
>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the
>>>> partiton column is not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>> schema,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when
>>>> reading from that.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we
>>>> do not need to generate the
>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the
>>>> table(but insert into)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also
>>>> mark the column as STORED when
>>>>>>>> we
>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> persist
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they
>>>> are orthogonal, we can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>> both, i
>>>>>>>>>>>>>>>>>> saw
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2]
>>>> would suggest to also define the
>>>>>>>>>>>>>>>> PARTITIONS
>>>>>>>>>>>>>>>>>>>>>> num, and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed
>>>> under a "tablenamespace",
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is
>>>> partition number N, where N =
>>>>>>>>>>>>> MOD(expr,
>>>>>>>>>>>>>>>>>> num),
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton
>>>> the record would persist ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800
>>>> PM6:16,Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>> dwysakow...@apache.org
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a
>>>> section to discuss relation to
>>>>>>>>>>>> FLIP-63
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also
>>>> tried to somewhat keep
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy
>>>>>>>> of
>>>>>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the
>>>> key.format.type.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered
>>>> exactly what you are suggesting
>>>>>>>>>>>>>>>> (prefixing
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I
>>>> should've put that into an
>>>>>>>>>>>>>>>>>> Option/Rejected
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp,
>>>> key.*, value.* are connector
>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>> Why I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not
>>>> adding that prefix in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the
>>>> properties in the WITH section are
>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>>>>>>>>>>>> properties.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the
>>>> end a connector property as
>>>>>>>> some
>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo.
>>>> The benefit of not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the
>>>>>>>>>>>>>> prefix
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> that it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit
>>>> shorter. Imagine prefixing all
>>>>>>>> the
>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go
>>>> with FLINK-12557:
>>>>>>>>>>>> elasticsearch):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> elasticsearch.key.format.type: csv
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> elasticsearch.key.format.field: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> elasticsearch.key.format.delimiter: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> elasticsearch.key.format.*: ....
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with
>>>> doing it though if this is a
>>>>>>>> preferred
>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> community.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update
>>>> the `value.fields.include`
>>>>>>>>>>>> property.
>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include.
>>>> Which I think you also
>>>>>>>> suggested
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> comment,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs
>>>> declaring output type of
>>>>>>>> computed
>>>>>>>>>>>>>>>> column.
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use
>>>> CAST, but declare a type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an
>>>>>>>>>>>>>>>>>> expression
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output
>>>> type of SYSTEM_METADATA. The
>>>>>>>> reason
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to
>>>> implement e.g. filter push
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>> working
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the
>>>> source, e.g. in case of Kafka's
>>>>>>>>>>>>>> offset, i
>>>>>>>>>>>>>>>>>>>>>> think it's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long
>>>> rather than string. This
>>>>>>>> could
>>>>>>>>>>>>> let
>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>> push
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g.
>>>> offset > 12345 & offset <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
>>>>>>>>>>>>>>>>>> Otherwise we
>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down
>>>> cast(offset, long) > 12345 &&
>>>>>>>>>>>>>> cast(offset,
>>>>>>>>>>>>>>>>>> long)
>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we
>>>> need to introduce the type for
>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>> columns
>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions
>>>> that infer output type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on
>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>>>>> type.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed
>>>> column push down. Yes,
>>>>>>>>>>>>>> SYSTEM_METADATA
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to
>>>> the source. If it is not
>>>>>>>> possible
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> planner
>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know
>>>> computed columns push down
>>>>>>>> will
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the
>>>> persisted computed column. I think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>> completely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my
>>>> current proposal you can also
>>>>>>>>>>>> partition
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference
>>>> between using a udf in
>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>> vs
>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is
>>>> that when you partition
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a
>>>>>>>>>>>>>> computed
>>>>>>>>>>>>>>>>>>>>>> column
>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also
>>>> computed when reading the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>> If
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> use a
>>>>>>>>>>>>>>>>>>>>>> udf in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the
>>>> expression is computed only
>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>> inserting
>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers
>>>> some of your questions. Looking
>>>>>>>>>>>>> forward
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> further
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020
>>>> 05:18, Jark Wu wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
>>>> starting such a great
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>> Reaing
>>>>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part
>>>> information from source is an important
>>>>>>>>>>>>> feature
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I
>>>> agree with the proposal of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my
>>>> thoughts and comments here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use
>>>> connector properties instead of
>>>>>>>>>>>>> introducing
>>>>>>>>>>>>>>>>>> HEADER
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you
>>>> mentioned in the FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already
>>>> introduced PARTITIONED BY in
>>>>>>>> FLIP-63.
>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to
>>>> explain what's the relationship
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts
>>>> conflict? Could INSERT
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION
>>>>>>>>>>>> be
>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table
>>>> in this FLIP?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently,
>>>> properties are hierarchical in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>> SQL.
>>>>>>>>>>>>>>>>>> Shall we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced
>>>> properties more hierarchical?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example,
>>>> "timestamp" =>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>> "connector.timestamp"?
>>>>>>>>>>>>>>>>>> (actually, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp"
>>>> which is another
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single
>>>> "timestamp" in properties may mislead
>>>>>>>> users
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime
>>>> attribute.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some
>>>> minor comments in the FLIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar
>>>> 2020 at 22:30, Dawid Wysakowicz <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to
>>>> propose an improvement that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>>>>>>>> reading
>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from
>>>> different parts of source records.
>>>>>>>>>>>>> Besides
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> main
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if
>>>> not all of the sources) expose
>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a
>>>> read-only metadata such as
>>>>>>>> offset,
>>>>>>>>>>>>>>>>>> ingestion
>>>>>>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>>>>>>>>>>>> or a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write
>>>> parts of the record that contain
>>>>>>>>>>>> data
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different
>>>> purposes (partitioning,
>>>>>>>> compaction
>>>>>>>>>>>>>> etc.),
>>>>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in
>>>> Kafka.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make
>>>> it possible to read and write
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In
>>>> this proposal I discuss reading
>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>>>>>> data,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness
>>>> this proposal discusses also the
>>>>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking
>>>> forward to your comments.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access
>>>> the FLIP here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
> 

Reply via email to