The new syntax looks good to me.

Best,
Kurt


On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Timo,
>
> I have one minor suggestion.
> Maybe the default data type of `timestamp`  can be `TIMESTAMP(3) WITH
> LOCAL TIME ZONE`, because this is the type that users want to use, this can
> avoid unnecessary casting.
> Besides, currently, the bigint is casted to timestamp in seconds, so the
> implicit cast may not work...
>
> I don't have other objections. But maybe we should wait for the
> opinion from @Kurt for the new syntax.
>
> Best,
> Jark
>
>
> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> wrote:
>
>> Thanks for driving this Timo, +1 for voting ~
>>
>> Best,
>> Danny Chan
>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
>> > Thanks everyone for this healthy discussion. I updated the FLIP with the
>> > outcome. I think the result is very powerful but also very easy to
>> > declare. Thanks for all the contributions.
>> >
>> > If there are no objections, I would continue with a voting.
>> >
>> > What do you think?
>> >
>> > Regards,
>> > Timo
>> >
>> >
>> > On 09.09.20 16:52, Timo Walther wrote:
>> > > "If virtual by default, when a user types "timestamp int" ==>
>> persisted
>> > > column, then adds a "metadata" after that ==> virtual column, then
>> adds
>> > > a "persisted" after that ==> persisted column."
>> > >
>> > > Thanks for this nice mental model explanation, Jark. This makes total
>> > > sense to me. Also making the the most common case as short at just
>> > > adding `METADATA` is a very good idea. Thanks, Danny!
>> > >
>> > > Let me update the FLIP again with all these ideas.
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > >
>> > > On 09.09.20 15:03, Jark Wu wrote:
>> > > > I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
>> > > > 'my-timestamp-field'] [VIRTUAL]
>> > > > Especially I like the shortcut: timestamp INT METADATA, this makes
>> the
>> > > > most
>> > > > common case to be supported in the simplest way.
>> > > >
>> > > > I also think the default should be "PERSISTED", so VIRTUAL is
>> optional
>> > > > when
>> > > > you are accessing a read-only metadata. Because:
>> > > > 1. The "timestamp INT METADATA" should be a normal column, because
>> > > > "METADATA" is just a modifier to indicate it is from metadata, a
>> normal
>> > > > column should be persisted.
>> > > >      If virtual by default, when a user types "timestamp int" ==>
>> > > > persisted
>> > > > column, then adds a "metadata" after that ==> virtual column, then
>> adds a
>> > > > "persisted" after that ==> persisted column.
>> > > >      I think this looks reversed several times and makes users
>> confused.
>> > > > Physical fields are also prefixed with "fieldName TYPE", so
>> "timestamp
>> > > > INT
>> > > > METADATA" is persisted is very straightforward.
>> > > > 2. From the collected user question [1], we can see that "timestamp"
>> > > > is the
>> > > > most common use case. "timestamp" is a read-write metadata.
>> Persisted by
>> > > > default doesn't break the reading behavior.
>> > > >
>> > > > Best,
>> > > > Jark
>> > > >
>> > > > [1]: https://issues.apache.org/jira/browse/FLINK-15869
>> > > >
>> > > > On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> wrote:
>> > > >
>> > > > > Thanks @Dawid for the nice summary, I think you catch all
>> opinions of
>> > > > > the
>> > > > > long discussion well.
>> > > > >
>> > > > > @Danny
>> > > > > “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
>> > > > >   Note that the "FROM 'field name'" is only needed when the name
>> > > > > conflict
>> > > > >   with the declared table column name, when there are no
>> conflicts,
>> > > > > we can
>> > > > > simplify it to
>> > > > >        timestamp INT METADATA"
>> > > > >
>> > > > > I really like the proposal, there is no confusion with computed
>> > > > > column any
>> > > > > more,  and it’s concise enough.
>> > > > >
>> > > > >
>> > > > > @Timo @Dawid
>> > > > > “We use `SYSTEM_TIME` for temporal tables. I think prefixing with
>> SYSTEM
>> > > > > makes it clearer that it comes magically from the system.”
>> > > > > “As for the issue of shortening the SYSTEM_METADATA to METADATA.
>> Here I
>> > > > > very much prefer the SYSTEM_ prefix.”
>> > > > >
>> > > > > I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a lot,
>> > > > > First of all,  the word `TIME` has broad meanings but the word
>> > > > > `METADATA `
>> > > > > not,  `METADATA ` has specific meaning,
>> > > > > Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
>> > > > > `SYSTEM_METADATA ` not.
>> > > > > Personally, I like more simplify way,sometimes  less is more.
>> > > > >
>> > > > >
>> > > > > Best,
>> > > > > Leonard
>> > > > >
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
>> > > > > >
>> > > > > > > Hi everyone,
>> > > > > > >
>> > > > > > > "key" and "value" in the properties are a special case
>> because they
>> > > > > > > need
>> > > > > > > to configure a format. So key and value are more than just
>> metadata.
>> > > > > > > Jark's example for setting a timestamp would work but as the
>> FLIP
>> > > > > > > discusses, we have way more metadata fields like headers,
>> > > > > > > epoch-leader,
>> > > > > > > etc. Having a property for all of this metadata would mess up
>> the WITH
>> > > > > > > section entirely. Furthermore, we also want to deal with
>> metadata from
>> > > > > > > the formats. Solving this through properties as well would
>> further
>> > > > > > > complicate the property design.
>> > > > > > >
>> > > > > > > Personally, I still like the computed column design more
>> because it
>> > > > > > > allows to have full flexibility to compute the final column:
>> > > > > > >
>> > > > > > > timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
>> > > > > TIMESTAMP(3)))
>> > > > > > >
>> > > > > > > Instead of having a helper column and a real column in the
>> table:
>> > > > > > >
>> > > > > > > helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
>> > > > > > > realTimestamp AS adjustTimestamp(helperTimestamp)
>> > > > > > >
>> > > > > > > But I see that the discussion leans towards:
>> > > > > > >
>> > > > > > > timestamp INT SYSTEM_METADATA("ts")
>> > > > > > >
>> > > > > > > Which is fine with me. It is the shortest solution, because
>> we don't
>> > > > > > > need additional CAST. We can discuss the syntax, so that
>> confusion
>> > > > > > > with
>> > > > > > > computed columns can be avoided.
>> > > > > > >
>> > > > > > > timestamp INT USING SYSTEM_METADATA("ts")
>> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts")
>> > > > > > > timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>> > > > > > >
>> > > > > > > We use `SYSTEM_TIME` for temporal tables. I think prefixing
>> with
>> > > > > > > SYSTEM
>> > > > > > > makes it clearer that it comes magically from the system.
>> > > > > > >
>> > > > > > > What do you think?
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Timo
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On 09.09.20 11:41, Jark Wu wrote:
>> > > > > > > > Hi Danny,
>> > > > > > > >
>> > > > > > > > This is not Oracle and MySQL computed column syntax,
>> because there is
>> > > > > no
>> > > > > > > > "AS" after the type.
>> > > > > > > >
>> > > > > > > > Hi everyone,
>> > > > > > > >
>> > > > > > > > If we want to use "offset INT SYSTEM_METADATA("offset")",
>> then I
>> > > > > > > > think
>> > > > > we
>> > > > > > > > must further discuss about "PERSISED" or "VIRTUAL" keyword
>> for
>> > > > > query-sink
>> > > > > > > > schema problem.
>> > > > > > > > Personally, I think we can use a shorter keyword "METADATA"
>> for
>> > > > > > > > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a
>> system
>> > > > > > > function
>> > > > > > > > and confuse users this looks like a computed column.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Jark
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, 9 Sep 2020 at 17:23, Danny Chan <
>> danny0...@apache.org> wrote:
>> > > > > > > >
>> > > > > > > > > "offset INT SYSTEM_METADATA("offset")"
>> > > > > > > > >
>> > > > > > > > > This is actually Oracle or MySQL style computed column
>> syntax.
>> > > > > > > > >
>> > > > > > > > > "You are right that one could argue that "timestamp",
>> "headers" are
>> > > > > > > > > something like "key" and "value""
>> > > > > > > > >
>> > > > > > > > > I have the same feeling, both key value and headers
>> timestamp are
>> > > > > *real*
>> > > > > > > > > data
>> > > > > > > > > stored in the consumed record, they are not computed or
>> generated.
>> > > > > > > > >
>> > > > > > > > > "Trying to solve everything via properties sounds rather
>> like a hack
>> > > > > to
>> > > > > > > > > me"
>> > > > > > > > >
>> > > > > > > > > Things are not that hack if we can unify the routines or
>> the
>> > > > > definitions
>> > > > > > > > > (all from the computed column way or all from the table
>> options), i
>> > > > > also
>> > > > > > > > > think that it is a hacky that we mix in 2 kinds of syntax
>> for
>> > > > > different
>> > > > > > > > > kinds of metadata (read-only and read-write). In this
>> FLIP, we
>> > > > > > > > > declare
>> > > > > > > the
>> > > > > > > > > Kafka key fields with table options but SYSTEM_METADATA
>> for other
>> > > > > > > metadata,
>> > > > > > > > > that is a hacky thing or something in-consistent.
>> > > > > > > > >
>> > > > > > > > > Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
>> > > > > > > > >
>> > > > > > > > > >   I would vote for `offset INT
>> SYSTEM_METADATA("offset")`.
>> > > > > > > > > >
>> > > > > > > > > > I don't think we can stick with the SQL standard in DDL
>> part
>> > > > > > > > > > forever,
>> > > > > > > > > > especially as there are more and more
>> > > > > > > > > > requirements coming from different connectors and
>> external systems.
>> > > > > > > > > >
>> > > > > > > > > > Best,
>> > > > > > > > > > Kurt
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <
>> twal...@apache.org>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Jark,
>> > > > > > > > > > >
>> > > > > > > > > > > now we are back at the original design proposed by
>> Dawid :D
>> > > > > > > > > > > Yes, we
>> > > > > > > > > > > should be cautious about adding new syntax. But the
>> length of this
>> > > > > > > > > > > discussion shows that we are looking for a good
>> long-term
>> > > > > > > > > > > solution.
>> > > > > In
>> > > > > > > > > > > this case I would rather vote for a deep integration
>> into the
>> > > > > syntax.
>> > > > > > > > > > >
>> > > > > > > > > > > Computed columns are also not SQL standard compliant.
>> And our
>> > > > > > > > > > > DDL is
>> > > > > > > > > > > neither, so we have some degree of freedom here.
>> > > > > > > > > > >
>> > > > > > > > > > > Trying to solve everything via properties sounds
>> rather like a
>> > > > > > > > > > > hack
>> > > > > to
>> > > > > > > > > > > me. You are right that one could argue that
>> "timestamp", "headers"
>> > > > > are
>> > > > > > > > > > > something like "key" and "value". However, mixing
>> > > > > > > > > > >
>> > > > > > > > > > > `offset AS SYSTEM_METADATA("offset")`
>> > > > > > > > > > >
>> > > > > > > > > > > and
>> > > > > > > > > > >
>> > > > > > > > > > > `'timestamp.field' = 'ts'`
>> > > > > > > > > > >
>> > > > > > > > > > > looks more confusing to users that an explicit
>> > > > > > > > > > >
>> > > > > > > > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
>> > > > > > > > > > >
>> > > > > > > > > > > or
>> > > > > > > > > > >
>> > > > > > > > > > > `offset INT SYSTEM_METADATA("offset")`
>> > > > > > > > > > >
>> > > > > > > > > > > that is symetric for both source and sink.
>> > > > > > > > > > >
>> > > > > > > > > > > What do others think?
>> > > > > > > > > > >
>> > > > > > > > > > > Regards,
>> > > > > > > > > > > Timo
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On 09.09.20 10:09, Jark Wu wrote:
>> > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > >
>> > > > > > > > > > > > I think we have a conclusion that the writable
>> metadata shouldn't
>> > > > > be
>> > > > > > > > > > > > defined as a computed column, but a normal column.
>> > > > > > > > > > > >
>> > > > > > > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is
>> one of the
>> > > > > > > > > > approaches.
>> > > > > > > > > > > > However, it is not SQL standard compliant, we need
>> to be cautious
>> > > > > > > > > > enough
>> > > > > > > > > > > > when adding new syntax.
>> > > > > > > > > > > > Besides, we have to introduce the `PERSISTED` or
>> `VIRTUAL`
>> > > > > > > > > > > > keyword
>> > > > > to
>> > > > > > > > > > > > resolve the query-sink schema problem if it is
>> read-only
>> > > > > > > > > > > > metadata.
>> > > > > > > > > That
>> > > > > > > > > > > > adds more stuff to learn for users.
>> > > > > > > > > > > >
>> > > > > > > > > > > > >  From my point of view, the "timestamp",
>> "headers" are something
>> > > > > like
>> > > > > > > > > > > "key"
>> > > > > > > > > > > > and "value" that stores with the real data. So why
>> not define the
>> > > > > > > > > > > > "timestamp" in the same way with "key" by using a
>> > > > > > > > > > > > "timestamp.field"
>> > > > > > > > > > > > connector option?
>> > > > > > > > > > > > On the other side, the read-only metadata, such as
>> "offset",
>> > > > > > > > > shouldn't
>> > > > > > > > > > be
>> > > > > > > > > > > > defined as a normal column. So why not use the
>> existing computed
>> > > > > > > > > column
>> > > > > > > > > > > > syntax for such metadata? Then we don't have the
>> query-sink
>> > > > > > > > > > > > schema
>> > > > > > > > > > > problem.
>> > > > > > > > > > > > So here is my proposal:
>> > > > > > > > > > > >
>> > > > > > > > > > > > CREATE TABLE kafka_table (
>> > > > > > > > > > > >     id BIGINT,
>> > > > > > > > > > > >     name STRING,
>> > > > > > > > > > > >     col1 STRING,
>> > > > > > > > > > > >     col2 STRING,
>> > > > > > > > > > > >     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts
>> is a normal
>> > > > > field,
>> > > > > > > > > so
>> > > > > > > > > > > can
>> > > > > > > > > > > > be read and written.
>> > > > > > > > > > > >     offset AS SYSTEM_METADATA("offset")
>> > > > > > > > > > > > ) WITH (
>> > > > > > > > > > > >     'connector' = 'kafka',
>> > > > > > > > > > > >     'topic' = 'test-topic',
>> > > > > > > > > > > >     'key.fields' = 'id, name',
>> > > > > > > > > > > >     'key.format' = 'csv',
>> > > > > > > > > > > >     'value.format' = 'avro',
>> > > > > > > > > > > >     'timestamp.field' = 'ts'    -- define the
>> mapping of Kafka
>> > > > > > > > > timestamp
>> > > > > > > > > > > > );
>> > > > > > > > > > > >
>> > > > > > > > > > > > INSERT INTO kafka_table
>> > > > > > > > > > > > SELECT id, name, col1, col2, rowtime FROM
>> another_table;
>> > > > > > > > > > > >
>> > > > > > > > > > > > I think this can solve all the problems without
>> introducing
>> > > > > > > > > > > > any new
>> > > > > > > > > > > syntax.
>> > > > > > > > > > > > The only minor disadvantage is that we separate the
>> definition
>> > > > > > > > > > way/syntax
>> > > > > > > > > > > > of read-only metadata and read-write fields.
>> > > > > > > > > > > > However, I don't think this is a big problem.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best,
>> > > > > > > > > > > > Jark
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther <
>> twal...@apache.org>
>> > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi Kurt,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > thanks for sharing your opinion. I'm totally up
>> for not reusing
>> > > > > > > > > > computed
>> > > > > > > > > > > > > columns. I think Jark was a big supporter of this
>> syntax, @Jark
>> > > > > are
>> > > > > > > > > > you
>> > > > > > > > > > > > > fine with this as well? The non-computed column
>> approach was
>> > > > > > > > > > > > > only
>> > > > > a
>> > > > > > > > > > > > > "slightly rejected alternative".
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Furthermore, we would need to think about how
>> such a new design
>> > > > > > > > > > > > > influences the LIKE clause though.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > However, we should still keep the `PERSISTED`
>> keyword as it
>> > > > > > > > > influences
>> > > > > > > > > > > > > the query->sink schema. If you look at the list
>> of metadata for
>> > > > > > > > > > existing
>> > > > > > > > > > > > > connectors and formats, we currently offer only
>> two writable
>> > > > > > > > > metadata
>> > > > > > > > > > > > > fields. Otherwise, one would need to declare two
>> tables
>> > > > > > > > > > > > > whenever a
>> > > > > > > > > > > > > metadata columns is read (one for the source, one
>> for the sink).
>> > > > > > > > > This
>> > > > > > > > > > > > > can be quite inconvientient e.g. for just reading
>> the topic.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > Timo
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On 09.09.20 08:52, Kurt Young wrote:
>> > > > > > > > > > > > > > I also share the concern that reusing the
>> computed column
>> > > > > > > > > > > > > > syntax
>> > > > > > > > > but
>> > > > > > > > > > > have
>> > > > > > > > > > > > > > different semantics
>> > > > > > > > > > > > > > would confuse users a lot.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Besides, I think metadata fields are
>> conceptually not the same
>> > > > > with
>> > > > > > > > > > > > > > computed columns. The metadata
>> > > > > > > > > > > > > > field is a connector specific thing and it only
>> contains the
>> > > > > > > > > > > information
>> > > > > > > > > > > > > > that where does the field come
>> > > > > > > > > > > > > > from (during source) or where does the field
>> need to write to
>> > > > > > > > > (during
>> > > > > > > > > > > > > > sink). It's more similar with normal
>> > > > > > > > > > > > > > fields, with assumption that all these fields
>> need going to the
>> > > > > > > > > data
>> > > > > > > > > > > > > part.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thus I'm more lean to the rejected alternative
>> that Timo
>> > > > > mentioned.
>> > > > > > > > > > > And I
>> > > > > > > > > > > > > > think we don't need the
>> > > > > > > > > > > > > > PERSISTED keyword, SYSTEM_METADATA should be
>> enough.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > During implementation, the framework only needs
>> to pass such
>> > > > > > > > > <field,
>> > > > > > > > > > > > > > metadata field> information to the
>> > > > > > > > > > > > > > connector, and the logic of handling such
>> fields inside the
>> > > > > > > > > connector
>> > > > > > > > > > > > > > should be straightforward.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Regarding the downside Timo mentioned:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > The disadvantage is that users cannot call
>> UDFs or parse
>> > > > > > > > > timestamps.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I think this is fairly simple to solve. Since
>> the metadata
>> > > > > > > > > > > > > > field
>> > > > > > > > > > isn't
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > computed column anymore, we can support
>> > > > > > > > > > > > > > referencing such fields in the computed column.
>> For example:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > CREATE TABLE kafka_table (
>> > > > > > > > > > > > > >         id BIGINT,
>> > > > > > > > > > > > > >         name STRING,
>> > > > > > > > > > > > > >         timestamp STRING
>> SYSTEM_METADATA("timestamp"),  //
>> > > > > > > > > > > > > > get the
>> > > > > > > > > > > > > timestamp
>> > > > > > > > > > > > > > field from metadata
>> > > > > > > > > > > > > >         ts AS to_timestamp(timestamp) // normal
>> computed
>> > > > > > > > > > > > > > column,
>> > > > > > > > > parse
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > string to TIMESTAMP type by using the metadata
>> field
>> > > > > > > > > > > > > > ) WITH (
>> > > > > > > > > > > > > >        ...
>> > > > > > > > > > > > > > )
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > Kurt
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
>> > > > > > > > > > > > > > <twal...@apache.org
>> > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Hi Leonard,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > the only alternative I see is that we
>> introduce a concept that
>> > > > > is
>> > > > > > > > > > > > > > > completely different to computed columns.
>> This is also
>> > > > > > > > > > > > > > > mentioned
>> > > > > > > > > in
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > rejected alternative section of the FLIP.
>> Something like:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > CREATE TABLE kafka_table (
>> > > > > > > > > > > > > > >         id BIGINT,
>> > > > > > > > > > > > > > >         name STRING,
>> > > > > > > > > > > > > > >         timestamp INT
>> SYSTEM_METADATA("timestamp") PERSISTED,
>> > > > > > > > > > > > > > >         headers MAP<STRING, BYTES>
>> SYSTEM_METADATA("headers")
>> > > > > > > > > > PERSISTED
>> > > > > > > > > > > > > > > ) WITH (
>> > > > > > > > > > > > > > >        ...
>> > > > > > > > > > > > > > > )
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > This way we would avoid confusion at all and
>> can easily map
>> > > > > > > > > columns
>> > > > > > > > > > to
>> > > > > > > > > > > > > > > metadata columns. The disadvantage is that
>> users cannot call
>> > > > > UDFs
>> > > > > > > > > or
>> > > > > > > > > > > > > > > parse timestamps. This would need to be done
>> in a real
>> > > > > > > > > > > > > > > computed
>> > > > > > > > > > > column.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > I'm happy about better alternatives.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > > > Timo
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On 08.09.20 15:37, Leonard Xu wrote:
>> > > > > > > > > > > > > > > > HI, Timo
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Thanks for driving this FLIP.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Sorry but I have a concern about Writing
>> metadata via
>> > > > > > > > > > > DynamicTableSink
>> > > > > > > > > > > > > > > section:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > CREATE TABLE kafka_table (
>> > > > > > > > > > > > > > > >       id BIGINT,
>> > > > > > > > > > > > > > > >       name STRING,
>> > > > > > > > > > > > > > > >       timestamp AS
>> CAST(SYSTEM_METADATA("timestamp") AS
>> > > > > > > > > > > > > > > > BIGINT)
>> > > > > > > > > > > > > PERSISTED,
>> > > > > > > > > > > > > > > >       headers AS
>> CAST(SYSTEM_METADATA("headers") AS
>> > > > > > > > > > > > > > > > MAP<STRING,
>> > > > > > > > > > > BYTES>)
>> > > > > > > > > > > > > > > PERSISTED
>> > > > > > > > > > > > > > > > ) WITH (
>> > > > > > > > > > > > > > > >       ...
>> > > > > > > > > > > > > > > > )
>> > > > > > > > > > > > > > > > An insert statement could look like:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > INSERT INTO kafka_table VALUES (
>> > > > > > > > > > > > > > > >       (1, "ABC", 1599133672, MAP('checksum',
>> > > > > > > > > computeChecksum(...)))
>> > > > > > > > > > > > > > > > )
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > The proposed INERT syntax does not make
>> sense to me,
>> > > > > > > > > > > > > > > > because it
>> > > > > > > > > > > > > contains
>> > > > > > > > > > > > > > > computed(generated) column.
>> > > > > > > > > > > > > > > > Both SQL server and Postgresql do not allow
>> to insert
>> > > > > > > > > > > > > > > > value to
>> > > > > > > > > > > computed
>> > > > > > > > > > > > > > > columns even they are persisted, this boke
>> the generated
>> > > > > > > > > > > > > > > column
>> > > > > > > > > > > > > semantics
>> > > > > > > > > > > > > > > and may confuse user much.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > For SQL server computed column[1]:
>> > > > > > > > > > > > > > > > > column_name AS computed_column_expression
>> [ PERSISTED [ NOT
>> > > > > > > > > NULL ]
>> > > > > > > > > > > > > ]...
>> > > > > > > > > > > > > > > > > NOTE: A computed column cannot be the
>> target of an INSERT or
>> > > > > > > > > > UPDATE
>> > > > > > > > > > > > > > > statement.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > For Postgresql generated column[2]:
>> > > > > > > > > > > > > > > > >      height_in numeric GENERATED ALWAYS
>> AS (height_cm /
>> > > > > > > > > > > > > > > > > 2.54)
>> > > > > > > > > > STORED
>> > > > > > > > > > > > > > > > > NOTE: A generated column cannot be
>> written to directly. In
>> > > > > > > > > INSERT
>> > > > > > > > > > or
>> > > > > > > > > > > > > > > UPDATE commands, a value cannot be specified
>> for a generated
>> > > > > > > > > column,
>> > > > > > > > > > > but
>> > > > > > > > > > > > > > > the keyword DEFAULT may be specified.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > It shouldn't be allowed to set/update value
>> for generated
>> > > > > column
>> > > > > > > > > > > after
>> > > > > > > > > > > > > > > lookup the SQL 2016:
>> > > > > > > > > > > > > > > > > <insert statement> ::=
>> > > > > > > > > > > > > > > > > INSERT INTO <insertion target> <insert
>> columns and source>
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > If <contextually typed table value
>> constructor> CTTVC is
>> > > > > > > > > > specified,
>> > > > > > > > > > > > > > > then every <contextually typed row
>> > > > > > > > > > > > > > > > > value constructor element> simply
>> contained in CTTVC whose
>> > > > > > > > > > > > > positionally
>> > > > > > > > > > > > > > > corresponding <column name>
>> > > > > > > > > > > > > > > > > in <insert column list> references a
>> column of which some
>> > > > > > > > > > underlying
>> > > > > > > > > > > > > > > column is a generated column shall
>> > > > > > > > > > > > > > > > > be a <default specification>.
>> > > > > > > > > > > > > > > > > A <default specification> specifies the
>> default value of
>> > > > > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > associated item.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > [1]
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>> > > > >
>> > > > > > > > > > > > > > > <
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
>> > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > [2]
>> > > > > > > > >
>> https://www.postgresql.org/docs/12/ddl-generated-columns.html
>> > > > > > > > > > <
>> > > > > > > > > > > > > > >
>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > 在 2020年9月8日,17:31,Timo Walther <
>> twal...@apache.org>
>> > > > > > > > > > > > > > > > > 写道:
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Hi Jark,
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > according to Flink's and Calcite's
>> casting definition in
>> > > > > [1][2]
>> > > > > > > > > > > > > > > TIMESTAMP WITH LOCAL TIME ZONE should be
>> castable from BIGINT.
>> > > > > If
>> > > > > > > > > > not,
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > will make it possible ;-)
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > I'm aware of
>> DeserializationSchema.getProducedType but I
>> > > > > > > > > > > > > > > > > think
>> > > > > > > > > > that
>> > > > > > > > > > > > > > > this method is actually misplaced. The type
>> should rather be
>> > > > > > > > > passed
>> > > > > > > > > > to
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > source itself.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > For our Kafka SQL source, we will also
>> not use this method
>> > > > > > > > > because
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > Kafka source will add own metadata in
>> addition to the
>> > > > > > > > > > > > > > > DeserializationSchema. So
>> > > > > > > > > > > > > > > DeserializationSchema.getProducedType
>> > > > > > > > > will
>> > > > > > > > > > > > > never
>> > > > > > > > > > > > > > > be read.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > For now I suggest to leave out the
>> `DataType` from
>> > > > > > > > > > > > > > > DecodingFormat.applyReadableMetadata. Also
>> because the
>> > > > > > > > > > > > > > > format's
>> > > > > > > > > > > physical
>> > > > > > > > > > > > > > > type is passed later in
>> `createRuntimeDecoder`. If
>> > > > > > > > > > > > > > > necessary, it
>> > > > > > > > > can
>> > > > > > > > > > > be
>> > > > > > > > > > > > > > > computed manually by consumedType + metadata
>> types. We will
>> > > > > > > > > provide
>> > > > > > > > > > a
>> > > > > > > > > > > > > > > metadata utility class for that.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > > > > > Timo
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > [1]
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
>> > > > >
>> > > > > > > > > > > > > > > > > [2]
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
>> > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On 08.09.20 10:52, Jark Wu wrote:
>> > > > > > > > > > > > > > > > > > Hi Timo,
>> > > > > > > > > > > > > > > > > > The updated CAST SYSTEM_METADATA
>> behavior sounds good to
>> > > > > > > > > > > > > > > > > > me.
>> > > > > I
>> > > > > > > > > > just
>> > > > > > > > > > > > > > > noticed
>> > > > > > > > > > > > > > > > > > that a BIGINT can't be converted to
>> "TIMESTAMP(3) WITH
>> > > > > > > > > > > > > > > > > > LOCAL
>> > > > > > > > > TIME
>> > > > > > > > > > > > > > > ZONE".
>> > > > > > > > > > > > > > > > > > So maybe we need to support this, or
>> use "TIMESTAMP(3) WITH
>> > > > > > > > > LOCAL
>> > > > > > > > > > > > > TIME
>> > > > > > > > > > > > > > > > > > ZONE" as the defined type of Kafka
>> timestamp? I think this
>> > > > > > > > > makes
>> > > > > > > > > > > > > sense,
>> > > > > > > > > > > > > > > > > > because it represents the milli-seconds
>> since epoch.
>> > > > > > > > > > > > > > > > > > Regarding "DeserializationSchema
>> doesn't need TypeInfo", I
>> > > > > > > > > don't
>> > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > so.
>> > > > > > > > > > > > > > > > > > The DeserializationSchema implements
>> ResultTypeQueryable,
>> > > > > thus
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > implementation needs to return an
>> output TypeInfo.
>> > > > > > > > > > > > > > > > > > Besides, FlinkKafkaConsumer also
>> > > > > > > > > > > > > > > > > > calls
>> DeserializationSchema.getProducedType as the produced
>> > > > > > > > > type
>> > > > > > > > > > of
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > source function [1].
>> > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > Jark
>> > > > > > > > > > > > > > > > > > [1]:
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
>> > > > >
>> > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2020 at 16:35, Timo
>> Walther <
>> > > > > twal...@apache.org>
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > I updated the FLIP again and hope
>> that I could address the
>> > > > > > > > > > > mentioned
>> > > > > > > > > > > > > > > > > > > concerns.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > @Leonard: Thanks for the explanation.
>> I wasn't aware that
>> > > > > > > > > ts_ms
>> > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > source.ts_ms have different
>> semantics. I updated the FLIP
>> > > > > and
>> > > > > > > > > > > expose
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > most commonly used properties
>> separately. So frequently
>> > > > > > > > > > > > > > > > > > > used
>> > > > > > > > > > > > > > > properties
>> > > > > > > > > > > > > > > > > > > are not hidden in the MAP anymore:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > debezium-json.ingestion-timestamp
>> > > > > > > > > > > > > > > > > > > debezium-json.source.timestamp
>> > > > > > > > > > > > > > > > > > > debezium-json.source.database
>> > > > > > > > > > > > > > > > > > > debezium-json.source.schema
>> > > > > > > > > > > > > > > > > > > debezium-json.source.table
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > However, since other properties
>> depend on the used
>> > > > > > > > > > > connector/vendor,
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > remaining options are stored in:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > debezium-json.source.properties
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > And accessed with:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
>> > > > > > > > > > > > > MAP<STRING,
>> > > > > > > > > > > > > > > > > > > STRING>)['table']
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Otherwise it is not possible to
>> figure out the value and
>> > > > > > > > > column
>> > > > > > > > > > > type
>> > > > > > > > > > > > > > > > > > > during validation.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > @Jark: You convinced me in relaxing
>> the CAST
>> > > > > > > > > > > > > > > > > > > constraints. I
>> > > > > > > > > > added
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > dedicacated sub-section to the FLIP:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > For making the use of SYSTEM_METADATA
>> easier and avoid
>> > > > > nested
>> > > > > > > > > > > > > casting
>> > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > allow explicit casting to a target
>> data type:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > rowtime AS
>> CAST(SYSTEM_METADATA("timestamp") AS
>> > > > > > > > > > > > > > > > > > > TIMESTAMP(3)
>> > > > > > > > > > WITH
>> > > > > > > > > > > > > > > LOCAL
>> > > > > > > > > > > > > > > > > > > TIME ZONE)
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > A connector still produces and
>> consumes the data type
>> > > > > returned
>> > > > > > > > > > by
>> > > > > > > > > > > > > > > > > > > `listMetadata()`. The planner will
>> insert necessary
>> > > > > > > > > > > > > > > > > > > explicit
>> > > > > > > > > > > casts.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > In any case, the user must provide a
>> CAST such that the
>> > > > > > > > > computed
>> > > > > > > > > > > > > > > column
>> > > > > > > > > > > > > > > > > > > receives a valid data type when
>> constructing the table
>> > > > > schema.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > "I don't see a reason why
>> > > > > > > > > `DecodingFormat#applyReadableMetadata`
>> > > > > > > > > > > > > > > needs a
>> > > > > > > > > > > > > > > > > > > DataType argument."
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Correct he DeserializationSchema
>> doesn't need TypeInfo, it
>> > > > > is
>> > > > > > > > > > > always
>> > > > > > > > > > > > > > > > > > > executed locally. It is the source
>> that needs TypeInfo for
>> > > > > > > > > > > > > serializing
>> > > > > > > > > > > > > > > > > > > the record to the next operator. And
>> that's this is
>> > > > > > > > > > > > > > > > > > > what we
>> > > > > > > > > > > provide.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > @Danny:
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > “SYSTEM_METADATA("offset")` returns
>> the NULL type by
>> > > > > default”
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > We can also use some other means to
>> represent an UNKNOWN
>> > > > > data
>> > > > > > > > > > > type.
>> > > > > > > > > > > > > In
>> > > > > > > > > > > > > > > > > > > the Flink type system, we use the
>> NullType for it. The
>> > > > > > > > > important
>> > > > > > > > > > > > > part
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > that the final data type is known for
>> the entire computed
>> > > > > > > > > > column.
>> > > > > > > > > > > > > As I
>> > > > > > > > > > > > > > > > > > > mentioned before, I would avoid the
>> suggested option b)
>> > > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > would
>> > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > > > similar to your suggestion. The CAST
>> should be enough and
>> > > > > > > > > allows
>> > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > complex expressions in the computed
>> column. Option b)
>> > > > > > > > > > > > > > > > > > > would
>> > > > > > > > > need
>> > > > > > > > > > > > > > > parser
>> > > > > > > > > > > > > > > > > > > changes.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > > > > > > > Timo
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > On 08.09.20 06:21, Leonard Xu wrote:
>> > > > > > > > > > > > > > > > > > > > Hi, Timo
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Thanks for you explanation and
>> update,  I have only one
>> > > > > > > > > > question
>> > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > the latest FLIP.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > About the MAP<STRING, STRING>
>> DataType of key
>> > > > > > > > > > > > > > > 'debezium-json.source', if
>> > > > > > > > > > > > > > > > > > > user want to use the table name
>> metadata, they need to
>> > > > > write:
>> > > > > > > > > > > > > > > > > > > > tableName STRING AS
>> > > > > > > > > CAST(SYSTEM_METADATA('debeuim-json.source')
>> > > > > > > > > > > AS
>> > > > > > > > > > > > > > > > > > > MAP<STRING, STRING>)['table']
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > the expression is a little complex
>> for user, Could we
>> > > > > > > > > > > > > > > > > > > > only
>> > > > > > > > > > > support
>> > > > > > > > > > > > > > > > > > > necessary metas with simple DataType
>> as following?
>> > > > > > > > > > > > > > > > > > > > tableName STRING AS
>> > > > > > > > > > > > > > >
>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
>> > > > > > > > > > > > > > > > > > > STRING),
>> > > > > > > > > > > > > > > > > > > > transactionTime LONG AS
>> > > > > > > > > > > > > > > > > > >
>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS
>> > > > > BIGINT),
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > In this way, we can simplify the
>> expression, the mainly
>> > > > > used
>> > > > > > > > > > > > > > > metadata in
>> > > > > > > > > > > > > > > > > > > changelog format may include
>> > > > > > > > > > > > > > > 'database','table','source.ts_ms','ts_ms' from
>> > > > > > > > > > > > > > > > > > > my side,
>> > > > > > > > > > > > > > > > > > > > maybe we could only support them at
>> first version.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Both Debezium and Canal have above
>> four metadata, and I‘m
>> > > > > > > > > > willing
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > take some subtasks in next
>> development if necessary.
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Debezium:
>> > > > > > > > > > > > > > > > > > > > {
>> > > > > > > > > > > > > > > > > > > >        "before": null,
>> > > > > > > > > > > > > > > > > > > >        "after": {  "id":
>> 101,"name": "scooter"},
>> > > > > > > > > > > > > > > > > > > >        "source": {
>> > > > > > > > > > > > > > > > > > > >          "db":
>> "inventory",                  # 1.
>> > > > > > > > > > > > > > > > > > > > database
>> > > > > > > > > name
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > changelog belongs to.
>> > > > > > > > > > > > > > > > > > > >          "table":
>> "products",                # 2.
>> > > > > > > > > > > > > > > > > > > > table name
>> > > > > > > > > the
>> > > > > > > > > > > > > > > changelog
>> > > > > > > > > > > > > > > > > > > belongs to.
>> > > > > > > > > > > > > > > > > > > >          "ts_ms":
>> 1589355504100,             # 3.
>> > > > > > > > > > > > > > > > > > > > timestamp
>> > > > > > > of
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > change
>> > > > > > > > > > > > > > > > > > > happened in database system, i.e.:
>> transaction time in
>> > > > > > > > > database.
>> > > > > > > > > > > > > > > > > > > >          "connector": "mysql",
>> > > > > > > > > > > > > > > > > > > >          ….
>> > > > > > > > > > > > > > > > > > > >        },
>> > > > > > > > > > > > > > > > > > > >        "ts_ms":
>> 1589355606100,              # 4.
>> > > > > > > > > > > > > > > > > > > > timestamp
>> > > > > > > > > when
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > debezium
>> > > > > > > > > > > > > > > > > > > processed the changelog.
>> > > > > > > > > > > > > > > > > > > >        "op": "c",
>> > > > > > > > > > > > > > > > > > > >        "transaction": null
>> > > > > > > > > > > > > > > > > > > > }
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Canal:
>> > > > > > > > > > > > > > > > > > > > {
>> > > > > > > > > > > > > > > > > > > >        "data": [{  "id": "102",
>> "name": "car battery" }],
>> > > > > > > > > > > > > > > > > > > >        "database":
>> "inventory",      # 1. database
>> > > > > > > > > > > > > > > > > > > > name the
>> > > > > > > > > > > changelog
>> > > > > > > > > > > > > > > > > > > belongs to.
>> > > > > > > > > > > > > > > > > > > >        "table":
>> "products",          # 2. table name the
>> > > > > > > > > > changelog
>> > > > > > > > > > > > > > > belongs
>> > > > > > > > > > > > > > > > > > > to.
>> > > > > > > > > > > > > > > > > > > >        "es":
>> 1589374013000,          # 3. execution
>> > > > > > > > > > > > > > > > > > > > time of
>> > > > > > > > > the
>> > > > > > > > > > > > > change
>> > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > database system, i.e.: transaction
>> time in database.
>> > > > > > > > > > > > > > > > > > > >        "ts":
>> 1589374013680,          # 4. timestamp
>> > > > > > > > > > > > > > > > > > > > when the
>> > > > > > > > > > > cannal
>> > > > > > > > > > > > > > > > > > > processed the changelog.
>> > > > > > > > > > > > > > > > > > > >        "isDdl": false,
>> > > > > > > > > > > > > > > > > > > >        "mysqlType": {},
>> > > > > > > > > > > > > > > > > > > >        ....
>> > > > > > > > > > > > > > > > > > > > }
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > Best
>> > > > > > > > > > > > > > > > > > > > Leonard
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > 在 2020年9月8日,11:57,Danny Chan
>> > > > > > > > > > > > > > > > > > > > > <yuzhao....@gmail.com> 写道:
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > Thanks Timo ~
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > The FLIP was already in pretty
>> good shape, I have only 2
>> > > > > > > > > > > questions
>> > > > > > > > > > > > > > > here:
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > 1.
>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
>> > > > > > > > > valid
>> > > > > > > > > > > > > > > read-only
>> > > > > > > > > > > > > > > > > > > computed column for Kafka and can be
>> extracted by the
>> > > > > > > > > planner.”
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > What is the pros we follow the
>> SQL-SERVER syntax here ?
>> > > > > > > > > > Usually
>> > > > > > > > > > > an
>> > > > > > > > > > > > > > > > > > > expression return type can be
>> inferred automatically.
>> > > > > > > > > > > > > > > > > > > But I
>> > > > > > > > > > guess
>> > > > > > > > > > > > > > > > > > > SQL-SERVER does not have function
>> like SYSTEM_METADATA
>> > > > > > > > > > > > > > > > > > > which
>> > > > > > > > > > > > > actually
>> > > > > > > > > > > > > > > does
>> > > > > > > > > > > > > > > > > > > not have a specific return type.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > And why not use the Oracle or
>> MySQL syntax there ?
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > column_name [datatype] [GENERATED
>> ALWAYS] AS
>> > > > > > > > > > > > > > > > > > > > > (expression)
>> > > > > > > > > > > > > [VIRTUAL]
>> > > > > > > > > > > > > > > > > > > > > Which is more straight-forward.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > 2. “SYSTEM_METADATA("offset")`
>> returns the NULL type by
>> > > > > > > > > > default”
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > The default type should not be
>> NULL because only NULL
>> > > > > > > > > literal
>> > > > > > > > > > > does
>> > > > > > > > > > > > > > > > > > > that. Usually we use ANY as the type
>> if we do not know the
>> > > > > > > > > > > specific
>> > > > > > > > > > > > > > > type in
>> > > > > > > > > > > > > > > > > > > the SQL context. ANY means the
>> physical value can be any
>> > > > > java
>> > > > > > > > > > > > > object.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > [1]
>> > > > > > > > > >
>> https://oracle-base.com/articles/11g/virtual-columns-11gr1
>> > > > > > > > > > > > > > > > > > > > > [2]
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
>> > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > Danny Chan
>> > > > > > > > > > > > > > > > > > > > > 在 2020年9月4日 +0800 PM4:48,Timo
>> Walther
>> > > > > > > > > > > > > > > > > > > > > <twal...@apache.org
>> > > > > > > > > > > ,写道:
>> > > > > > > > > > > > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > I completely reworked FLIP-107.
>> It now covers the full
>> > > > > > > > > story
>> > > > > > > > > > > how
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > read
>> > > > > > > > > > > > > > > > > > > > > > and write metadata from/to
>> connectors and formats. It
>> > > > > > > > > > considers
>> > > > > > > > > > > > > > > all of
>> > > > > > > > > > > > > > > > > > > > > > the latest FLIPs, namely
>> FLIP-95, FLIP-132 and
>> > > > > > > > > > > > > > > > > > > > > > FLIP-122.
>> > > > > It
>> > > > > > > > > > > > > > > introduces
>> > > > > > > > > > > > > > > > > > > > > > the concept of PERSISTED
>> computed columns and leaves
>> > > > > > > > > > > > > > > > > > > > > > out
>> > > > > > > > > > > > > > > partitioning
>> > > > > > > > > > > > > > > > > > > > > > for now.
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > Looking forward to your
>> feedback.
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > > > > > > > > > > Timo
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > On 04.03.20 09:45, Kurt Young
>> wrote:
>> > > > > > > > > > > > > > > > > > > > > > > Sorry, forgot one question.
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > 4. Can we make the
>> value.fields-include more
>> > > > > > > > > > > > > > > > > > > > > > > orthogonal?
>> > > > > > > > > > Like
>> > > > > > > > > > > > > one
>> > > > > > > > > > > > > > > can
>> > > > > > > > > > > > > > > > > > > > > > > specify it as "EXCEPT_KEY,
>> EXCEPT_TIMESTAMP".
>> > > > > > > > > > > > > > > > > > > > > > > With current EXCEPT_KEY and
>> EXCEPT_KEY_TIMESTAMP,
>> > > > > > > > > > > > > > > > > > > > > > > users
>> > > > > > > > > can
>> > > > > > > > > > > not
>> > > > > > > > > > > > > > > > > > > config to
>> > > > > > > > > > > > > > > > > > > > > > > just ignore timestamp but
>> keep key.
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > Kurt
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 4, 2020 at 4:42
>> PM Kurt Young <
>> > > > > > > > > ykt...@gmail.com
>> > > > > > > > > > >
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > Hi Dawid,
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > I have a couple of
>> questions around key fields,
>> > > > > actually
>> > > > > > > > > I
>> > > > > > > > > > > also
>> > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > > > > > > > > other questions but want to
>> be focused on key fields
>> > > > > > > > > first.
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > 1. I don't fully understand
>> the usage of
>> > > > > > > > > > > > > > > > > > > > > > > > "key.fields".
>> > > > > Is
>> > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > option only
>> > > > > > > > > > > > > > > > > > > > > > > > valid during write
>> operation? Because for
>> > > > > > > > > > > > > > > > > > > > > > > > reading, I can't imagine
>> how such options can be
>> > > > > > > > > applied. I
>> > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > expect
>> > > > > > > > > > > > > > > > > > > > > > > > that there might be a
>> SYSTEM_METADATA("key")
>> > > > > > > > > > > > > > > > > > > > > > > > to read and assign the key
>> to a normal field?
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > 2. If "key.fields" is only
>> valid in write
>> > > > > > > > > > > > > > > > > > > > > > > > operation, I
>> > > > > > > > > want
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > propose we
>> > > > > > > > > > > > > > > > > > > > > > > > can simplify the options to
>> not introducing
>> > > > > > > > > key.format.type
>> > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > > > > > > other related options. I
>> think a single "key.field"
>> > > > > (not
>> > > > > > > > > > > > > fields)
>> > > > > > > > > > > > > > > > > > > would be
>> > > > > > > > > > > > > > > > > > > > > > > > enough, users can use UDF
>> to calculate whatever key
>> > > > > they
>> > > > > > > > > > > > > > > > > > > > > > > > want before sink.
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > 3. Also I don't want to
>> introduce "value.format.type"
>> > > > > and
>> > > > > > > > > > > > > > > > > > > > > > > > "value.format.xxx" with the
>> "value" prefix. Not every
>> > > > > > > > > > > connector
>> > > > > > > > > > > > > > > has a
>> > > > > > > > > > > > > > > > > > > > > > > > concept
>> > > > > > > > > > > > > > > > > > > > > > > > of key and values. The old
>> parameter "format.type"
>> > > > > > > > > already
>> > > > > > > > > > > good
>> > > > > > > > > > > > > > > > > > > enough to
>> > > > > > > > > > > > > > > > > > > > > > > > use.
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > Kurt
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 3, 2020 at
>> 10:40 PM Jark Wu <
>> > > > > > > > > imj...@gmail.com>
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid,
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > I have two more questions.
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > SupportsMetadata
>> > > > > > > > > > > > > > > > > > > > > > > > > Introducing
>> SupportsMetadata sounds good to me.
>> > > > > > > > > > > > > > > > > > > > > > > > > But I
>> > > > > > > > > have
>> > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > > > questions
>> > > > > > > > > > > > > > > > > > > > > > > > > regarding to this
>> interface.
>> > > > > > > > > > > > > > > > > > > > > > > > > 1) How do the source know
>> what the expected return
>> > > > > type
>> > > > > > > > > of
>> > > > > > > > > > > > > each
>> > > > > > > > > > > > > > > > > > > metadata?
>> > > > > > > > > > > > > > > > > > > > > > > > > 2) Where to put the
>> metadata fields? Append to the
>> > > > > > > > > > existing
>> > > > > > > > > > > > > > > physical
>> > > > > > > > > > > > > > > > > > > > > > > > > fields?
>> > > > > > > > > > > > > > > > > > > > > > > > > If yes, I would suggest
>> to change the signature to
>> > > > > > > > > > > > > `TableSource
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> appendMetadataFields(String[] metadataNames,
>> > > > > DataType[]
>> > > > > > > > > > > > > > > > > > > metadataTypes)`
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> SYSTEM_METADATA("partition")
>> > > > > > > > > > > > > > > > > > > > > > > > > Can SYSTEM_METADATA()
>> function be used nested in a
>> > > > > > > > > > computed
>> > > > > > > > > > > > > > > column
>> > > > > > > > > > > > > > > > > > > > > > > > > expression? If yes, how
>> to specify the return
>> > > > > > > > > > > > > > > > > > > > > > > > > type of
>> > > > > > > > > > > > > > > > > > > SYSTEM_METADATA?
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > > Jark
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 3 Mar 2020 at
>> 17:06, Dawid Wysakowicz <
>> > > > > > > > > > > > > > > > > > > dwysakow...@apache.org>
>> > > > > > > > > > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > 1. I thought a bit more
>> on how the source would
>> > > > > > > > > > > > > > > > > > > > > > > > > > emit
>> > > > > > > > > the
>> > > > > > > > > > > > > > > columns
>> > > > > > > > > > > > > > > > > > > and I
>> > > > > > > > > > > > > > > > > > > > > > > > > > now see its not exactly
>> the same as regular
>> > > > > > > > > > > > > > > > > > > > > > > > > > columns.
>> > > > > I
>> > > > > > > > > > see
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > > need
>> > > > > > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > > > > > > > > elaborate a bit more on
>> that in the FLIP as you
>> > > > > asked,
>> > > > > > > > > > > Jark.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > I do agree mostly with
>> Danny on how we should do
>> > > > > that.
>> > > > > > > > > > One
>> > > > > > > > > > > > > > > > > > > additional
>> > > > > > > > > > > > > > > > > > > > > > > > > > things I would
>> introduce is an
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > interface
>> SupportsMetadata {
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > boolean
>> supportsMetadata(Set<String>
>> > > > > > > > > > > > > > > > > > > > > > > > > > metadataFields);
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > TableSource
>> generateMetadataFields(Set<String>
>> > > > > > > > > > > > > metadataFields);
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > }
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > This way the source
>> would have to declare/emit only
>> > > > > the
>> > > > > > > > > > > > > > > requested
>> > > > > > > > > > > > > > > > > > > > > > > > > > metadata fields. In
>> order not to clash with user
>> > > > > > > > > defined
>> > > > > > > > > > > > > > > fields.
>> > > > > > > > > > > > > > > > > > > When
>> > > > > > > > > > > > > > > > > > > > > > > > > > emitting the metadata
>> field I would prepend the
>> > > > > column
>> > > > > > > > > > name
>> > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> __system_{property_name}. Therefore when requested
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> SYSTEM_METADATA("partition") the source would
>> > > > > > > > > > > > > > > > > > > > > > > > > > append
>> > > > > a
>> > > > > > > > > > > field
>> > > > > > > > > > > > > > > > > > > > > > > > > > __system_partition to
>> the schema. This would be
>> > > > > > > > > > > > > > > > > > > > > > > > > > never
>> > > > > > > > > > > visible
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > user as it would be
>> used only for the subsequent
>> > > > > > > > > computed
>> > > > > > > > > > > > > > > columns.
>> > > > > > > > > > > > > > > > > > > If
>> > > > > > > > > > > > > > > > > > > > > > > > > > that makes sense to
>> you, I will update the FLIP
>> > > > > > > > > > > > > > > > > > > > > > > > > > with
>> > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > description.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > 2. CAST vs explicit
>> type in computed columns
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > Here I agree with
>> Danny. It is also the current
>> > > > > > > > > > > > > > > > > > > > > > > > > > state
>> > > > > > > > > of
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > proposal.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > 3. Partitioning on
>> computed column vs function
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > Here I also agree with
>> Danny. I also think those
>> > > > > > > > > > > > > > > > > > > > > > > > > > are
>> > > > > > > > > > > > > > > orthogonal. I
>> > > > > > > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > > > > > > > > leave out the STORED
>> computed columns out of the
>> > > > > > > > > > > discussion.
>> > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > > > > don't see
>> > > > > > > > > > > > > > > > > > > > > > > > > > how do they relate to
>> the partitioning. I
>> > > > > > > > > > > > > > > > > > > > > > > > > > already put
>> > > > > > > > > > both
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > > those
>> > > > > > > > > > > > > > > > > > > > > > > > > > cases in the document.
>> We can either partition on a
>> > > > > > > > > > > computed
>> > > > > > > > > > > > > > > > > > > column or
>> > > > > > > > > > > > > > > > > > > > > > > > > > use a udf in a
>> partioned by clause. I am fine with
>> > > > > > > > > > leaving
>> > > > > > > > > > > > > out
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning by udf in
>> the first version if you
>> > > > > > > > > > > > > > > > > > > > > > > > > > still
>> > > > > > > > > > have
>> > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > > > > > > > > > concerns.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > As for your question
>> Danny. It depends which
>> > > > > > > > > partitioning
>> > > > > > > > > > > > > > > strategy
>> > > > > > > > > > > > > > > > > > > you
>> > > > > > > > > > > > > > > > > > > > > > > > > use.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > For the HASH
>> partitioning strategy I thought it
>> > > > > > > > > > > > > > > > > > > > > > > > > > would
>> > > > > > > > > > work
>> > > > > > > > > > > as
>> > > > > > > > > > > > > > > you
>> > > > > > > > > > > > > > > > > > > > > > > > > > explained. It would be
>> N = MOD(expr, num). I am not
>> > > > > > > > > sure
>> > > > > > > > > > > > > > > though if
>> > > > > > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > > > > > > > > should introduce the
>> PARTITIONS clause. Usually
>> > > > > > > > > > > > > > > > > > > > > > > > > > Flink
>> > > > > > > > > > does
>> > > > > > > > > > > > > not
>> > > > > > > > > > > > > > > own
>> > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > data and the partitions
>> are already an intrinsic
>> > > > > > > > > property
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > underlying source e.g.
>> for kafka we do not create
>> > > > > > > > > topics,
>> > > > > > > > > > > but
>> > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > just
>> > > > > > > > > > > > > > > > > > > > > > > > > > describe pre-existing
>> pre-partitioned topic.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > 4. timestamp vs
>> timestamp.field vs
>> > > > > > > > > > > > > > > > > > > > > > > > > > connector.field vs
>> > > > > > > > > ...
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with changing
>> it to timestamp.field to be
>> > > > > > > > > > > > > consistent
>> > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > > > > > > > > other value.fields and
>> key.fields. Actually that
>> > > > > > > > > > > > > > > > > > > > > > > > > > was
>> > > > > > > > > also
>> > > > > > > > > > > my
>> > > > > > > > > > > > > > > > > > > initial
>> > > > > > > > > > > > > > > > > > > > > > > > > > proposal in a first
>> draft I prepared. I changed it
>> > > > > > > > > > > afterwards
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > shorten
>> > > > > > > > > > > > > > > > > > > > > > > > > > the key.
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > Dawid
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > On 03/03/2020 09:00,
>> Danny Chan wrote:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for
>> bringing up this discussion, I
>> > > > > think
>> > > > > > > > > it
>> > > > > > > > > > > is
>> > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > useful
>> > > > > > > > > > > > > > > > > > > > > > > > > > feature ~
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > About how the
>> metadata outputs from source
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > I think it is
>> completely orthogonal, computed
>> > > > > > > > > > > > > > > > > > > > > > > > > > > column
>> > > > > > > > > > push
>> > > > > > > > > > > > > > > down is
>> > > > > > > > > > > > > > > > > > > > > > > > > > another topic, this
>> should not be a blocker but a
>> > > > > > > > > > > promotion,
>> > > > > > > > > > > > > > > if we
>> > > > > > > > > > > > > > > > > > > do
>> > > > > > > > > > > > > > > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > > > > > > > > > > > have any filters on the
>> computed column, there
>> > > > > > > > > > > > > > > > > > > > > > > > > > is no
>> > > > > > > > > need
>> > > > > > > > > > > to
>> > > > > > > > > > > > > > > do any
>> > > > > > > > > > > > > > > > > > > > > > > > > > pushings; the source
>> node just emit the complete
>> > > > > record
>> > > > > > > > > > > with
>> > > > > > > > > > > > > > > full
>> > > > > > > > > > > > > > > > > > > > > > > > > metadata
>> > > > > > > > > > > > > > > > > > > > > > > > > > with the declared
>> physical schema, then when
>> > > > > generating
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > virtual
>> > > > > > > > > > > > > > > > > > > > > > > > > > columns, we would
>> extract the metadata info and
>> > > > > output
>> > > > > > > > > as
>> > > > > > > > > > > > > full
>> > > > > > > > > > > > > > > > > > > > > > > > > columns(with
>> > > > > > > > > > > > > > > > > > > > > > > > > > full schema).
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > About the type of
>> metadata column
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > Personally i prefer
>> explicit type instead of CAST,
>> > > > > > > > > they
>> > > > > > > > > > > are
>> > > > > > > > > > > > > > > > > > > symantic
>> > > > > > > > > > > > > > > > > > > > > > > > > > equivalent though,
>> explict type is more
>> > > > > > > > > straight-forward
>> > > > > > > > > > > and
>> > > > > > > > > > > > > > > we can
>> > > > > > > > > > > > > > > > > > > > > > > > > declare
>> > > > > > > > > > > > > > > > > > > > > > > > > > the nullable attribute
>> there.
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > About option A:
>> partitioning based on acomputed
>> > > > > column
>> > > > > > > > > > VS
>> > > > > > > > > > > > > > > option
>> > > > > > > > > > > > > > > > > > > B:
>> > > > > > > > > > > > > > > > > > > > > > > > > > partitioning with just
>> a function
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > >      From the FLIP,
>> it seems that B's
>> > > > > > > > > > > > > > > > > > > > > > > > > > > partitioning is
>> > > > > > > > > > just
>> > > > > > > > > > > a
>> > > > > > > > > > > > > > > strategy
>> > > > > > > > > > > > > > > > > > > when
>> > > > > > > > > > > > > > > > > > > > > > > > > > writing data, the
>> partiton column is not
>> > > > > > > > > > > > > > > > > > > > > > > > > > included in
>> > > > > > > > > the
>> > > > > > > > > > > > > table
>> > > > > > > > > > > > > > > > > > > schema,
>> > > > > > > > > > > > > > > > > > > > > > > > > so
>> > > > > > > > > > > > > > > > > > > > > > > > > > it's just useless when
>> reading from that.
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > - Compared to A, we
>> do not need to generate the
>> > > > > > > > > > partition
>> > > > > > > > > > > > > > > column
>> > > > > > > > > > > > > > > > > > > when
>> > > > > > > > > > > > > > > > > > > > > > > > > > selecting from the
>> table(but insert into)
>> > > > > > > > > > > > > > > > > > > > > > > > > > > - For A we can also
>> mark the column as STORED when
>> > > > > we
>> > > > > > > > > > want
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > > > persist
>> > > > > > > > > > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > So in my opition they
>> are orthogonal, we can
>> > > > > > > > > > > > > > > > > > > > > > > > > > > support
>> > > > > > > > > > > both, i
>> > > > > > > > > > > > > > > saw
>> > > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > > > > > > > > MySQL/Oracle[1][2]
>> would suggest to also define the
>> > > > > > > > > > > > > PARTITIONS
>> > > > > > > > > > > > > > > > > > > num, and
>> > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > partitions are managed
>> under a "tablenamespace",
>> > > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > partition
>> > > > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > > > > > which
>> > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > record is stored is
>> partition number N, where N =
>> > > > > > > > > > MOD(expr,
>> > > > > > > > > > > > > > > num),
>> > > > > > > > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > > > > > > > your
>> > > > > > > > > > > > > > > > > > > > > > > > > > design, which partiton
>> the record would persist ?
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > [1]
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > >
>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
>> > > > > > > > > > > > > > > > > > > > > > > > > > > [2]
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
>> > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > Danny Chan
>> > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年3月2日 +0800
>> PM6:16,Dawid Wysakowicz <
>> > > > > > > > > > > > > > > dwysakow...@apache.org
>> > > > > > > > > > > > > > > > > > > > > > > > > > ,写道:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jark,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 2 I added a
>> section to discuss relation to
>> > > > > > > > > FLIP-63
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad. 3 Yes, I also
>> tried to somewhat keep
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > hierarchy
>> > > > > of
>> > > > > > > > > > > > > > > properties.
>> > > > > > > > > > > > > > > > > > > > > > > > > > Therefore you have the
>> key.format.type.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > I also considered
>> exactly what you are suggesting
>> > > > > > > > > > > > > (prefixing
>> > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > > > > > > > > connector or kafka). I
>> should've put that into an
>> > > > > > > > > > > > > > > Option/Rejected
>> > > > > > > > > > > > > > > > > > > > > > > > > > alternatives.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree timestamp,
>> key.*, value.* are connector
>> > > > > > > > > > > properties.
>> > > > > > > > > > > > > > > Why I
>> > > > > > > > > > > > > > > > > > > > > > > > > > wanted to suggest not
>> adding that prefix in the
>> > > > > > > > > > > > > > > > > > > > > > > > > > first
>> > > > > > > > > > > version
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > > > > > > > > actually all the
>> properties in the WITH section are
>> > > > > > > > > > > connector
>> > > > > > > > > > > > > > > > > > > > > > > > > properties.
>> > > > > > > > > > > > > > > > > > > > > > > > > > Even format is in the
>> end a connector property as
>> > > > > some
>> > > > > > > > > of
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > sources
>> > > > > > > > > > > > > > > > > > > > > > > > > might
>> > > > > > > > > > > > > > > > > > > > > > > > > > not have a format, imo.
>> The benefit of not
>> > > > > > > > > > > > > > > > > > > > > > > > > > adding the
>> > > > > > > > > > > prefix
>> > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > that it
>> > > > > > > > > > > > > > > > > > > > > > > > > > makes the keys a bit
>> shorter. Imagine prefixing all
>> > > > > the
>> > > > > > > > > > > > > > > properties
>> > > > > > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > > > > > > > > connector (or if we go
>> with FLINK-12557:
>> > > > > > > > > elasticsearch):
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> elasticsearch.key.format.type: csv
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> elasticsearch.key.format.field: ....
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> elasticsearch.key.format.delimiter: ....
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> elasticsearch.key.format.*: ....
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > I am fine with
>> doing it though if this is a
>> > > > > preferred
>> > > > > > > > > > > > > > > approach
>> > > > > > > > > > > > > > > > > > > in the
>> > > > > > > > > > > > > > > > > > > > > > > > > > community.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Ad in-line comments:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > I forgot to update
>> the `value.fields.include`
>> > > > > > > > > property.
>> > > > > > > > > > > It
>> > > > > > > > > > > > > > > > > > > should be
>> > > > > > > > > > > > > > > > > > > > > > > > > > value.fields-include.
>> Which I think you also
>> > > > > suggested
>> > > > > > > > > in
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > comment,
>> > > > > > > > > > > > > > > > > > > > > > > > > > right?
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the cast vs
>> declaring output type of
>> > > > > computed
>> > > > > > > > > > > > > column.
>> > > > > > > > > > > > > > > I
>> > > > > > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > > > > > > > > > > > it's better not to use
>> CAST, but declare a type
>> > > > > > > > > > > > > > > > > > > > > > > > > > of an
>> > > > > > > > > > > > > > > expression
>> > > > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > > > > > > > later
>> > > > > > > > > > > > > > > > > > > > > > > > > > on infer the output
>> type of SYSTEM_METADATA. The
>> > > > > reason
>> > > > > > > > > > is
>> > > > > > > > > > > I
>> > > > > > > > > > > > > > > think
>> > > > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > > > > > > > way
>> > > > > > > > > > > > > > > > > > > > > > > > > > it will be easier to
>> implement e.g. filter push
>> > > > > > > > > > > > > > > > > > > > > > > > > > downs
>> > > > > > > > > > when
>> > > > > > > > > > > > > > > working
>> > > > > > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > native types of the
>> source, e.g. in case of Kafka's
>> > > > > > > > > > > offset, i
>> > > > > > > > > > > > > > > > > > > think it's
>> > > > > > > > > > > > > > > > > > > > > > > > > > better to pushdown long
>> rather than string. This
>> > > > > could
>> > > > > > > > > > let
>> > > > > > > > > > > us
>> > > > > > > > > > > > > > > push
>> > > > > > > > > > > > > > > > > > > > > > > > > > expression like e.g.
>> offset > 12345 & offset <
>> > > > > > > > > > > > > > > > > > > > > > > > > > 59382.
>> > > > > > > > > > > > > > > Otherwise we
>> > > > > > > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > > > > > > > > have to push down
>> cast(offset, long) > 12345 &&
>> > > > > > > > > > > cast(offset,
>> > > > > > > > > > > > > > > long)
>> > > > > > > > > > > > > > > > > > > <
>> > > > > > > > > > > > > > > > > > > > > > > > > 59382.
>> > > > > > > > > > > > > > > > > > > > > > > > > > Moreover I think we
>> need to introduce the type for
>> > > > > > > > > > computed
>> > > > > > > > > > > > > > > columns
>> > > > > > > > > > > > > > > > > > > > > > > > > anyway
>> > > > > > > > > > > > > > > > > > > > > > > > > > to support functions
>> that infer output type
>> > > > > > > > > > > > > > > > > > > > > > > > > > based on
>> > > > > > > > > > > expected
>> > > > > > > > > > > > > > > > > > > return
>> > > > > > > > > > > > > > > > > > > > > > > > > type.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the computed
>> column push down. Yes,
>> > > > > > > > > > > SYSTEM_METADATA
>> > > > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > > > > > > > > > > > to be pushed down to
>> the source. If it is not
>> > > > > possible
>> > > > > > > > > > the
>> > > > > > > > > > > > > > > planner
>> > > > > > > > > > > > > > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > > > > > > > > > > > fail. As far as I know
>> computed columns push down
>> > > > > will
>> > > > > > > > > be
>> > > > > > > > > > > > > part
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > source
>> > > > > > > > > > > > > > > > > > > > > > > > > > rework, won't it? ;)
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the
>> persisted computed column. I think
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > it is
>> > > > > > > > > > > > > > > completely
>> > > > > > > > > > > > > > > > > > > > > > > > > > orthogonal. In my
>> current proposal you can also
>> > > > > > > > > partition
>> > > > > > > > > > > by
>> > > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > > > > computed
>> > > > > > > > > > > > > > > > > > > > > > > > > > column. The difference
>> between using a udf in
>> > > > > > > > > partitioned
>> > > > > > > > > > > by
>> > > > > > > > > > > > > vs
>> > > > > > > > > > > > > > > > > > > > > > > > > partitioned
>> > > > > > > > > > > > > > > > > > > > > > > > > > by a computed column is
>> that when you partition
>> > > > > > > > > > > > > > > > > > > > > > > > > > by a
>> > > > > > > > > > > computed
>> > > > > > > > > > > > > > > > > > > column
>> > > > > > > > > > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > > > > > > > > column must be also
>> computed when reading the
>> > > > > > > > > > > > > > > > > > > > > > > > > > table.
>> > > > > If
>> > > > > > > > > > you
>> > > > > > > > > > > > > > > use a
>> > > > > > > > > > > > > > > > > > > udf in
>> > > > > > > > > > > > > > > > > > > > > > > > > > the partitioned by, the
>> expression is computed only
>> > > > > > > > > when
>> > > > > > > > > > > > > > > inserting
>> > > > > > > > > > > > > > > > > > > into
>> > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > table.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers
>> some of your questions. Looking
>> > > > > > > > > > forward
>> > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > further
>> > > > > > > > > > > > > > > > > > > > > > > > > > suggestions.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > On 02/03/2020
>> 05:18, Jark Wu wrote:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Dawid for
>> starting such a great
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > discussion.
>> > > > > > > > > > > Reaing
>> > > > > > > > > > > > > > > > > > > metadata
>> > > > > > > > > > > > > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > key-part
>> information from source is an important
>> > > > > > > > > > feature
>> > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > > > > > > > streaming
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > users.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, I
>> agree with the proposal of the
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > FLIP.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I will leave my
>> thoughts and comments here:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) +1 to use
>> connector properties instead of
>> > > > > > > > > > introducing
>> > > > > > > > > > > > > > > HEADER
>> > > > > > > > > > > > > > > > > > > > > > > > > > keyword as
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > the reason you
>> mentioned in the FLIP.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) we already
>> introduced PARTITIONED BY in
>> > > > > FLIP-63.
>> > > > > > > > > > > Maybe
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > > > > > > > > > > > add a
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > section to
>> explain what's the relationship
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > between
>> > > > > > > > > > them.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do their concepts
>> conflict? Could INSERT
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITION
>> > > > > > > > > be
>> > > > > > > > > > > used
>> > > > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > PARTITIONED table
>> in this FLIP?
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Currently,
>> properties are hierarchical in
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Flink
>> > > > > > > > > > SQL.
>> > > > > > > > > > > > > > > Shall we
>> > > > > > > > > > > > > > > > > > > > > > > > > make
>> > > > > > > > > > > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > new introduced
>> properties more hierarchical?
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > For example,
>> "timestamp" =>
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> "connector.timestamp"?
>> > > > > > > > > > > > > > > (actually, I
>> > > > > > > > > > > > > > > > > > > > > > > > > > prefer
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > "kafka.timestamp"
>> which is another
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > improvement for
>> > > > > > > > > > > > > > > properties
>> > > > > > > > > > > > > > > > > > > > > > > > > > FLINK-12557)
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > A single
>> "timestamp" in properties may mislead
>> > > > > users
>> > > > > > > > > > > that
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > > > > > > field
>> > > > > > > > > > > > > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > a rowtime
>> attribute.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also left some
>> minor comments in the FLIP.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jark
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, 1 Mar
>> 2020 at 22:30, Dawid Wysakowicz <
>> > > > > > > > > > > > > > > > > > > > > > > > > dwysakow...@apache.org>
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to
>> propose an improvement that
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > would
>> > > > > > > > > > > enable
>> > > > > > > > > > > > > > > > > > > reading
>> > > > > > > > > > > > > > > > > > > > > > > > > table
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > columns from
>> different parts of source records.
>> > > > > > > > > > Besides
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > main
>> > > > > > > > > > > > > > > > > > > > > > > > > > payload
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > majority (if
>> not all of the sources) expose
>> > > > > > > > > > additional
>> > > > > > > > > > > > > > > > > > > > > > > > > information. It
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > can be simply a
>> read-only metadata such as
>> > > > > offset,
>> > > > > > > > > > > > > > > ingestion
>> > > > > > > > > > > > > > > > > > > time
>> > > > > > > > > > > > > > > > > > > > > > > > > or a
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > read and write
>> parts of the record that contain
>> > > > > > > > > data
>> > > > > > > > > > > but
>> > > > > > > > > > > > > > > > > > > > > > > > > additionally
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > serve different
>> purposes (partitioning,
>> > > > > compaction
>> > > > > > > > > > > etc.),
>> > > > > > > > > > > > > > > e.g.
>> > > > > > > > > > > > > > > > > > > key
>> > > > > > > > > > > > > > > > > > > > > > > > > or
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp in
>> Kafka.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should make
>> it possible to read and write
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data
>> > > > > > > > > > from
>> > > > > > > > > > > > > all
>> > > > > > > > > > > > > > > of
>> > > > > > > > > > > > > > > > > > > those
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > locations. In
>> this proposal I discuss reading
>> > > > > > > > > > > > > partitioning
>> > > > > > > > > > > > > > > > > > > data,
>> > > > > > > > > > > > > > > > > > > > > > > > > for
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > completeness
>> this proposal discusses also the
>> > > > > > > > > > > > > partitioning
>> > > > > > > > > > > > > > > when
>> > > > > > > > > > > > > > > > > > > > > > > > > > writing
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > data out.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am looking
>> forward to your comments.
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You can access
>> the FLIP here:
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
>> > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dawid
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to