Hi Danny,

This is not Oracle and MySQL computed column syntax, because there is no
"AS" after the type.

Hi everyone,

If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
schema problem.
Personally, I think we can use a shorter keyword "METADATA" for
"SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system function
and confuse users this looks like a computed column.


Best,
Jark



On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> wrote:

> "offset INT SYSTEM_METADATA("offset")"
>
> This is actually Oracle or MySQL style computed column syntax.
>
> "You are right that one could argue that "timestamp", "headers" are
> something like "key" and "value""
>
> I have the same feeling, both key value and headers timestamp are *real*
> data
> stored in the consumed record, they are not computed or generated.
>
> "Trying to solve everything via properties sounds rather like a hack to
> me"
>
> Things are not that hack if we can unify the routines or the definitions
> (all from the computed column way or all from the table options), i also
> think that it is a hacky that we mix in 2 kinds of syntax for different
> kinds of metadata (read-only and read-write). In this FLIP, we declare the
> Kafka key fields with table options but SYSTEM_METADATA for other metadata,
> that is a hacky thing or something in-consistent.
>
> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
>
> >  I would vote for `offset INT SYSTEM_METADATA("offset")`.
> >
> > I don't think we can stick with the SQL standard in DDL part forever,
> > especially as there are more and more
> > requirements coming from different connectors and external systems.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org> wrote:
> >
> > > Hi Jark,
> > >
> > > now we are back at the original design proposed by Dawid :D Yes, we
> > > should be cautious about adding new syntax. But the length of this
> > > discussion shows that we are looking for a good long-term solution. In
> > > this case I would rather vote for a deep integration into the syntax.
> > >
> > > Computed columns are also not SQL standard compliant. And our DDL is
> > > neither, so we have some degree of freedom here.
> > >
> > > Trying to solve everything via properties sounds rather like a hack to
> > > me. You are right that one could argue that "timestamp", "headers" are
> > > something like "key" and "value". However, mixing
> > >
> > > `offset AS SYSTEM_METADATA("offset")`
> > >
> > > and
> > >
> > > `'timestamp.field' = 'ts'`
> > >
> > > looks more confusing to users that an explicit
> > >
> > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> > >
> > > or
> > >
> > > `offset INT SYSTEM_METADATA("offset")`
> > >
> > > that is symetric for both source and sink.
> > >
> > > What do others think?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 09.09.20 10:09, Jark Wu wrote:
> > > > Hi everyone,
> > > >
> > > > I think we have a conclusion that the writable metadata shouldn't be
> > > > defined as a computed column, but a normal column.
> > > >
> > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
> > approaches.
> > > > However, it is not SQL standard compliant, we need to be cautious
> > enough
> > > > when adding new syntax.
> > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > > > resolve the query-sink schema problem if it is read-only metadata.
> That
> > > > adds more stuff to learn for users.
> > > >
> > > >>From my point of view, the "timestamp", "headers" are something like
> > > "key"
> > > > and "value" that stores with the real data. So why not define the
> > > > "timestamp" in the same way with "key" by using a "timestamp.field"
> > > > connector option?
> > > > On the other side, the read-only metadata, such as "offset",
> shouldn't
> > be
> > > > defined as a normal column. So why not use the existing computed
> column
> > > > syntax for such metadata? Then we don't have the query-sink schema
> > > problem.
> > > > So here is my proposal:
> > > >
> > > > CREATE TABLE kafka_table (
> > > >    id BIGINT,
> > > >    name STRING,
> > > >    col1 STRING,
> > > >    col2 STRING,
> > > >    ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts is a normal field,
> so
> > > can
> > > > be read and written.
> > > >    offset AS SYSTEM_METADATA("offset")
> > > > ) WITH (
> > > >    'connector' = 'kafka',
> > > >    'topic' = 'test-topic',
> > > >    'key.fields' = 'id, name',
> > > >    'key.format' = 'csv',
> > > >    'value.format' = 'avro',
> > > >    'timestamp.field' = 'ts'    -- define the mapping of Kafka
> timestamp
> > > > );
> > > >
> > > > INSERT INTO kafka_table
> > > > SELECT id, name, col1, col2, rowtime FROM another_table;
> > > >
> > > > I think this can solve all the problems without introducing any new
> > > syntax.
> > > > The only minor disadvantage is that we separate the definition
> > way/syntax
> > > > of read-only metadata and read-write fields.
> > > > However, I don't think this is a big problem.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org>
> wrote:
> > > >
> > > >> Hi Kurt,
> > > >>
> > > >> thanks for sharing your opinion. I'm totally up for not reusing
> > computed
> > > >> columns. I think Jark was a big supporter of this syntax, @Jark are
> > you
> > > >> fine with this as well? The non-computed column approach was only a
> > > >> "slightly rejected alternative".
> > > >>
> > > >> Furthermore, we would need to think about how such a new design
> > > >> influences the LIKE clause though.
> > > >>
> > > >> However, we should still keep the `PERSISTED` keyword as it
> influences
> > > >> the query->sink schema. If you look at the list of metadata for
> > existing
> > > >> connectors and formats, we currently offer only two writable
> metadata
> > > >> fields. Otherwise, one would need to declare two tables whenever a
> > > >> metadata columns is read (one for the source, one for the sink).
> This
> > > >> can be quite inconvientient e.g. for just reading the topic.
> > > >>
> > > >> Regards,
> > > >> Timo
> > > >>
> > > >>
> > > >> On 09.09.20 08:52, Kurt Young wrote:
> > > >>> I also share the concern that reusing the computed column syntax
> but
> > > have
> > > >>> different semantics
> > > >>> would confuse users a lot.
> > > >>>
> > > >>> Besides, I think metadata fields are conceptually not the same with
> > > >>> computed columns. The metadata
> > > >>> field is a connector specific thing and it only contains the
> > > information
> > > >>> that where does the field come
> > > >>> from (during source) or where does the field need to write to
> (during
> > > >>> sink). It's more similar with normal
> > > >>> fields, with assumption that all these fields need going to the
> data
> > > >> part.
> > > >>>
> > > >>> Thus I'm more lean to the rejected alternative that Timo mentioned.
> > > And I
> > > >>> think we don't need the
> > > >>> PERSISTED keyword, SYSTEM_METADATA should be enough.
> > > >>>
> > > >>> During implementation, the framework only needs to pass such
> <field,
> > > >>> metadata field> information to the
> > > >>> connector, and the logic of handling such fields inside the
> connector
> > > >>> should be straightforward.
> > > >>>
> > > >>> Regarding the downside Timo mentioned:
> > > >>>
> > > >>>> The disadvantage is that users cannot call UDFs or parse
> timestamps.
> > > >>>
> > > >>> I think this is fairly simple to solve. Since the metadata field
> > isn't
> > > a
> > > >>> computed column anymore, we can support
> > > >>> referencing such fields in the computed column. For example:
> > > >>>
> > > >>> CREATE TABLE kafka_table (
> > > >>>        id BIGINT,
> > > >>>        name STRING,
> > > >>>        timestamp STRING SYSTEM_METADATA("timestamp"),  // get the
> > > >> timestamp
> > > >>> field from metadata
> > > >>>        ts AS to_timestamp(timestamp) // normal computed column,
> parse
> > > the
> > > >>> string to TIMESTAMP type by using the metadata field
> > > >>> ) WITH (
> > > >>>       ...
> > > >>> )
> > > >>>
> > > >>> Best,
> > > >>> Kurt
> > > >>>
> > > >>>
> > > >>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org>
> > > wrote:
> > > >>>
> > > >>>> Hi Leonard,
> > > >>>>
> > > >>>> the only alternative I see is that we introduce a concept that is
> > > >>>> completely different to computed columns. This is also mentioned
> in
> > > the
> > > >>>> rejected alternative section of the FLIP. Something like:
> > > >>>>
> > > >>>> CREATE TABLE kafka_table (
> > > >>>>        id BIGINT,
> > > >>>>        name STRING,
> > > >>>>        timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
> > > >>>>        headers MAP<STRING, BYTES> SYSTEM_METADATA("headers")
> > PERSISTED
> > > >>>> ) WITH (
> > > >>>>       ...
> > > >>>> )
> > > >>>>
> > > >>>> This way we would avoid confusion at all and can easily map
> columns
> > to
> > > >>>> metadata columns. The disadvantage is that users cannot call UDFs
> or
> > > >>>> parse timestamps. This would need to be done in a real computed
> > > column.
> > > >>>>
> > > >>>> I'm happy about better alternatives.
> > > >>>>
> > > >>>> Regards,
> > > >>>> Timo
> > > >>>>
> > > >>>>
> > > >>>> On 08.09.20 15:37, Leonard Xu wrote:
> > > >>>>> HI, Timo
> > > >>>>>
> > > >>>>> Thanks for driving this FLIP.
> > > >>>>>
> > > >>>>> Sorry but I have a concern about Writing metadata via
> > > DynamicTableSink
> > > >>>> section:
> > > >>>>>
> > > >>>>> CREATE TABLE kafka_table (
> > > >>>>>      id BIGINT,
> > > >>>>>      name STRING,
> > > >>>>>      timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT)
> > > >> PERSISTED,
> > > >>>>>      headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING,
> > > BYTES>)
> > > >>>> PERSISTED
> > > >>>>> ) WITH (
> > > >>>>>      ...
> > > >>>>> )
> > > >>>>> An insert statement could look like:
> > > >>>>>
> > > >>>>> INSERT INTO kafka_table VALUES (
> > > >>>>>      (1, "ABC", 1599133672, MAP('checksum',
> computeChecksum(...)))
> > > >>>>> )
> > > >>>>>
> > > >>>>> The proposed INERT syntax does not make sense to me, because it
> > > >> contains
> > > >>>> computed(generated) column.
> > > >>>>> Both SQL server and Postgresql do not allow to insert value to
> > > computed
> > > >>>> columns even they are persisted, this boke the generated column
> > > >> semantics
> > > >>>> and may confuse user much.
> > > >>>>>
> > > >>>>> For SQL server computed column[1]:
> > > >>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT
> NULL ]
> > > >> ]...
> > > >>>>>> NOTE: A computed column cannot be the target of an INSERT or
> > UPDATE
> > > >>>> statement.
> > > >>>>>
> > > >>>>> For Postgresql generated column[2]:
> > > >>>>>>     height_in numeric GENERATED ALWAYS AS (height_cm / 2.54)
> > STORED
> > > >>>>>> NOTE: A generated column cannot be written to directly. In
> INSERT
> > or
> > > >>>> UPDATE commands, a value cannot be specified for a generated
> column,
> > > but
> > > >>>> the keyword DEFAULT may be specified.
> > > >>>>>
> > > >>>>> It shouldn't be allowed to set/update value for generated column
> > > after
> > > >>>> lookup the SQL 2016:
> > > >>>>>> <insert statement> ::=
> > > >>>>>> INSERT INTO <insertion target> <insert columns and source>
> > > >>>>>>
> > > >>>>>> If <contextually typed table value constructor> CTTVC is
> > specified,
> > > >>>> then every <contextually typed row
> > > >>>>>> value constructor element> simply contained in CTTVC whose
> > > >> positionally
> > > >>>> corresponding <column name>
> > > >>>>>> in <insert column list> references a column of which some
> > underlying
> > > >>>> column is a generated column shall
> > > >>>>>> be a <default specification>.
> > > >>>>>> A <default specification> specifies the default value of some
> > > >>>> associated item.
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]
> > > >>>>
> > > >>
> > >
> >
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > > >>>> <
> > > >>>>
> > > >>
> > >
> >
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > > >>>>>
> > > >>>>> [2]
> https://www.postgresql.org/docs/12/ddl-generated-columns.html
> > <
> > > >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> > > >>>>>
> > > >>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道:
> > > >>>>>>
> > > >>>>>> Hi Jark,
> > > >>>>>>
> > > >>>>>> according to Flink's and Calcite's casting definition in [1][2]
> > > >>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If
> > not,
> > > >> we
> > > >>>> will make it possible ;-)
> > > >>>>>>
> > > >>>>>> I'm aware of DeserializationSchema.getProducedType but I think
> > that
> > > >>>> this method is actually misplaced. The type should rather be
> passed
> > to
> > > >> the
> > > >>>> source itself.
> > > >>>>>>
> > > >>>>>> For our Kafka SQL source, we will also not use this method
> because
> > > the
> > > >>>> Kafka source will add own metadata in addition to the
> > > >>>> DeserializationSchema. So DeserializationSchema.getProducedType
> will
> > > >> never
> > > >>>> be read.
> > > >>>>>>
> > > >>>>>> For now I suggest to leave out the `DataType` from
> > > >>>> DecodingFormat.applyReadableMetadata. Also because the format's
> > > physical
> > > >>>> type is passed later in `createRuntimeDecoder`. If necessary, it
> can
> > > be
> > > >>>> computed manually by consumedType + metadata types. We will
> provide
> > a
> > > >>>> metadata utility class for that.
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Timo
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> > > >>>>>> [2]
> > > >>>>
> > > >>
> > >
> >
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 08.09.20 10:52, Jark Wu wrote:
> > > >>>>>>> Hi Timo,
> > > >>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I
> > just
> > > >>>> noticed
> > > >>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL
> TIME
> > > >>>> ZONE".
> > > >>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH
> LOCAL
> > > >> TIME
> > > >>>>>>> ZONE" as the defined type of Kafka timestamp? I think this
> makes
> > > >> sense,
> > > >>>>>>> because it represents the milli-seconds since epoch.
> > > >>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I
> don't
> > > >> think
> > > >>>> so.
> > > >>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus
> > the
> > > >>>>>>> implementation needs to return an output TypeInfo.
> > > >>>>>>> Besides, FlinkKafkaConsumer also
> > > >>>>>>> calls DeserializationSchema.getProducedType as the produced
> type
> > of
> > > >> the
> > > >>>>>>> source function [1].
> > > >>>>>>> Best,
> > > >>>>>>> Jark
> > > >>>>>>> [1]:
> > > >>>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> > > >>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org>
> > > >> wrote:
> > > >>>>>>>> Hi everyone,
> > > >>>>>>>>
> > > >>>>>>>> I updated the FLIP again and hope that I could address the
> > > mentioned
> > > >>>>>>>> concerns.
> > > >>>>>>>>
> > > >>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that
> ts_ms
> > > and
> > > >>>>>>>> source.ts_ms have different semantics. I updated the FLIP and
> > > expose
> > > >>>> the
> > > >>>>>>>> most commonly used properties separately. So frequently used
> > > >>>> properties
> > > >>>>>>>> are not hidden in the MAP anymore:
> > > >>>>>>>>
> > > >>>>>>>> debezium-json.ingestion-timestamp
> > > >>>>>>>> debezium-json.source.timestamp
> > > >>>>>>>> debezium-json.source.database
> > > >>>>>>>> debezium-json.source.schema
> > > >>>>>>>> debezium-json.source.table
> > > >>>>>>>>
> > > >>>>>>>> However, since other properties depend on the used
> > > connector/vendor,
> > > >>>> the
> > > >>>>>>>> remaining options are stored in:
> > > >>>>>>>>
> > > >>>>>>>> debezium-json.source.properties
> > > >>>>>>>>
> > > >>>>>>>> And accessed with:
> > > >>>>>>>>
> > > >>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
> > > >> MAP<STRING,
> > > >>>>>>>> STRING>)['table']
> > > >>>>>>>>
> > > >>>>>>>> Otherwise it is not possible to figure out the value and
> column
> > > type
> > > >>>>>>>> during validation.
> > > >>>>>>>>
> > > >>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I
> > added
> > > a
> > > >>>>>>>> dedicacated sub-section to the FLIP:
> > > >>>>>>>>
> > > >>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested
> > > >> casting
> > > >>>> we
> > > >>>>>>>> allow explicit casting to a target data type:
> > > >>>>>>>>
> > > >>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3)
> > WITH
> > > >>>> LOCAL
> > > >>>>>>>> TIME ZONE)
> > > >>>>>>>>
> > > >>>>>>>> A connector still produces and consumes the data type returned
> > by
> > > >>>>>>>> `listMetadata()`. The planner will insert necessary explicit
> > > casts.
> > > >>>>>>>>
> > > >>>>>>>> In any case, the user must provide a CAST such that the
> computed
> > > >>>> column
> > > >>>>>>>> receives a valid data type when constructing the table schema.
> > > >>>>>>>>
> > > >>>>>>>> "I don't see a reason why
> `DecodingFormat#applyReadableMetadata`
> > > >>>> needs a
> > > >>>>>>>> DataType argument."
> > > >>>>>>>>
> > > >>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is
> > > always
> > > >>>>>>>> executed locally. It is the source that needs TypeInfo for
> > > >> serializing
> > > >>>>>>>> the record to the next operator. And that's this is what we
> > > provide.
> > > >>>>>>>>
> > > >>>>>>>> @Danny:
> > > >>>>>>>>
> > > >>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default”
> > > >>>>>>>>
> > > >>>>>>>> We can also use some other means to represent an UNKNOWN data
> > > type.
> > > >> In
> > > >>>>>>>> the Flink type system, we use the NullType for it. The
> important
> > > >> part
> > > >>>> is
> > > >>>>>>>> that the final data type is known for the entire computed
> > column.
> > > >> As I
> > > >>>>>>>> mentioned before, I would avoid the suggested option b) that
> > would
> > > >> be
> > > >>>>>>>> similar to your suggestion. The CAST should be enough and
> allows
> > > for
> > > >>>>>>>> complex expressions in the computed column. Option b) would
> need
> > > >>>> parser
> > > >>>>>>>> changes.
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Timo
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
> > > >>>>>>>>> Hi, Timo
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for you explanation and update,  I have only one
> > question
> > > >> for
> > > >>>>>>>> the latest FLIP.
> > > >>>>>>>>>
> > > >>>>>>>>> About the MAP<STRING, STRING> DataType of key
> > > >>>> 'debezium-json.source', if
> > > >>>>>>>> user want to use the table name metadata, they need to write:
> > > >>>>>>>>> tableName STRING AS
> CAST(SYSTEM_METADATA('debeuim-json.source')
> > > AS
> > > >>>>>>>> MAP<STRING, STRING>)['table']
> > > >>>>>>>>>
> > > >>>>>>>>> the expression is a little complex for user, Could we only
> > > support
> > > >>>>>>>> necessary metas with simple DataType as following?
> > > >>>>>>>>> tableName STRING AS
> > > >>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> > > >>>>>>>> STRING),
> > > >>>>>>>>> transactionTime LONG AS
> > > >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> > > >>>>>>>>>
> > > >>>>>>>>> In this way, we can simplify the expression, the mainly used
> > > >>>> metadata in
> > > >>>>>>>> changelog format may include
> > > >>>> 'database','table','source.ts_ms','ts_ms' from
> > > >>>>>>>> my side,
> > > >>>>>>>>> maybe we could only support them at first version.
> > > >>>>>>>>>
> > > >>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m
> > willing
> > > >> to
> > > >>>>>>>> take some subtasks in next development if necessary.
> > > >>>>>>>>>
> > > >>>>>>>>> Debezium:
> > > >>>>>>>>> {
> > > >>>>>>>>>       "before": null,
> > > >>>>>>>>>       "after": {  "id": 101,"name": "scooter"},
> > > >>>>>>>>>       "source": {
> > > >>>>>>>>>         "db": "inventory",                  # 1. database
> name
> > > the
> > > >>>>>>>> changelog belongs to.
> > > >>>>>>>>>         "table": "products",                # 2. table name
> the
> > > >>>> changelog
> > > >>>>>>>> belongs to.
> > > >>>>>>>>>         "ts_ms": 1589355504100,             # 3. timestamp of
> > the
> > > >>>> change
> > > >>>>>>>> happened in database system, i.e.: transaction time in
> database.
> > > >>>>>>>>>         "connector": "mysql",
> > > >>>>>>>>>         ….
> > > >>>>>>>>>       },
> > > >>>>>>>>>       "ts_ms": 1589355606100,              # 4. timestamp
> when
> > > the
> > > >>>> debezium
> > > >>>>>>>> processed the changelog.
> > > >>>>>>>>>       "op": "c",
> > > >>>>>>>>>       "transaction": null
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>> Canal:
> > > >>>>>>>>> {
> > > >>>>>>>>>       "data": [{  "id": "102", "name": "car battery" }],
> > > >>>>>>>>>       "database": "inventory",      # 1. database name the
> > > changelog
> > > >>>>>>>> belongs to.
> > > >>>>>>>>>       "table": "products",          # 2. table name the
> > changelog
> > > >>>> belongs
> > > >>>>>>>> to.
> > > >>>>>>>>>       "es": 1589374013000,          # 3. execution time of
> the
> > > >> change
> > > >>>> in
> > > >>>>>>>> database system, i.e.: transaction time in database.
> > > >>>>>>>>>       "ts": 1589374013680,          # 4. timestamp when the
> > > cannal
> > > >>>>>>>> processed the changelog.
> > > >>>>>>>>>       "isDdl": false,
> > > >>>>>>>>>       "mysqlType": {},
> > > >>>>>>>>>       ....
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Best
> > > >>>>>>>>> Leonard
> > > >>>>>>>>>
> > > >>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道:
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks Timo ~
> > > >>>>>>>>>>
> > > >>>>>>>>>> The FLIP was already in pretty good shape, I have only 2
> > > questions
> > > >>>> here:
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
> valid
> > > >>>> read-only
> > > >>>>>>>> computed column for Kafka and can be extracted by the
> planner.”
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ?
> > Usually
> > > an
> > > >>>>>>>> expression return type can be inferred automatically. But I
> > guess
> > > >>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which
> > > >> actually
> > > >>>> does
> > > >>>>>>>> not have a specific return type.
> > > >>>>>>>>>>
> > > >>>>>>>>>> And why not use the Oracle or MySQL syntax there ?
> > > >>>>>>>>>>
> > > >>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression)
> > > >> [VIRTUAL]
> > > >>>>>>>>>> Which is more straight-forward.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by
> > default”
> > > >>>>>>>>>>
> > > >>>>>>>>>> The default type should not be NULL because only NULL
> literal
> > > does
> > > >>>>>>>> that. Usually we use ANY as the type if we do not know the
> > > specific
> > > >>>> type in
> > > >>>>>>>> the SQL context. ANY means the physical value can be any java
> > > >> object.
> > > >>>>>>>>>>
> > > >>>>>>>>>> [1]
> > https://oracle-base.com/articles/11g/virtual-columns-11gr1
> > > >>>>>>>>>> [2]
> > > >>>>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Danny Chan
> > > >>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org
> > >,写道:
> > > >>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I completely reworked FLIP-107. It now covers the full
> story
> > > how
> > > >> to
> > > >>>>>>>> read
> > > >>>>>>>>>>> and write metadata from/to connectors and formats. It
> > considers
> > > >>>> all of
> > > >>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> > > >>>> introduces
> > > >>>>>>>>>>> the concept of PERSISTED computed columns and leaves out
> > > >>>> partitioning
> > > >>>>>>>>>>> for now.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Looking forward to your feedback.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Regards,
> > > >>>>>>>>>>> Timo
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote:
> > > >>>>>>>>>>>> Sorry, forgot one question.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal?
> > Like
> > > >> one
> > > >>>> can
> > > >>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
> > > >>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users
> can
> > > not
> > > >>>>>>>> config to
> > > >>>>>>>>>>>> just ignore timestamp but keep key.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best,
> > > >>>>>>>>>>>> Kurt
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <
> ykt...@gmail.com
> > >
> > > >>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Dawid,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I have a couple of questions around key fields, actually
> I
> > > also
> > > >>>> have
> > > >>>>>>>> some
> > > >>>>>>>>>>>>> other questions but want to be focused on key fields
> first.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is
> > > this
> > > >>>>>>>> option only
> > > >>>>>>>>>>>>> valid during write operation? Because for
> > > >>>>>>>>>>>>> reading, I can't imagine how such options can be
> applied. I
> > > >> would
> > > >>>>>>>> expect
> > > >>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key")
> > > >>>>>>>>>>>>> to read and assign the key to a normal field?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I
> want
> > > to
> > > >>>>>>>> propose we
> > > >>>>>>>>>>>>> can simplify the options to not introducing
> key.format.type
> > > and
> > > >>>>>>>>>>>>> other related options. I think a single "key.field" (not
> > > >> fields)
> > > >>>>>>>> would be
> > > >>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they
> > > >>>>>>>>>>>>> want before sink.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and
> > > >>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every
> > > connector
> > > >>>> has a
> > > >>>>>>>>>>>>> concept
> > > >>>>>>>>>>>>> of key and values. The old parameter "format.type"
> already
> > > good
> > > >>>>>>>> enough to
> > > >>>>>>>>>>>>> use.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Kurt
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <
> imj...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks Dawid,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I have two more questions.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> SupportsMetadata
> > > >>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I
> have
> > > >> some
> > > >>>>>>>> questions
> > > >>>>>>>>>>>>>> regarding to this interface.
> > > >>>>>>>>>>>>>> 1) How do the source know what the expected return type
> of
> > > >> each
> > > >>>>>>>> metadata?
> > > >>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the
> > existing
> > > >>>> physical
> > > >>>>>>>>>>>>>> fields?
> > > >>>>>>>>>>>>>> If yes, I would suggest to change the signature to
> > > >> `TableSource
> > > >>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[]
> > > >>>>>>>> metadataTypes)`
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition")
> > > >>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a
> > computed
> > > >>>> column
> > > >>>>>>>>>>>>>> expression? If yes, how to specify the return type of
> > > >>>>>>>> SYSTEM_METADATA?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
> > > >>>>>>>> dwysakow...@apache.org>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit
> the
> > > >>>> columns
> > > >>>>>>>> and I
> > > >>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I
> > see
> > > a
> > > >>>> need
> > > >>>>>>>> to
> > > >>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked,
> > > Jark.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that.
> > One
> > > >>>>>>>> additional
> > > >>>>>>>>>>>>>>> things I would introduce is an
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> interface SupportsMetadata {
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields);
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String>
> > > >> metadataFields);
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> This way the source would have to declare/emit only the
> > > >>>> requested
> > > >>>>>>>>>>>>>>> metadata fields. In order not to clash with user
> defined
> > > >>>> fields.
> > > >>>>>>>> When
> > > >>>>>>>>>>>>>>> emitting the metadata field I would prepend the column
> > name
> > > >>>> with
> > > >>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested
> > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a
> > > field
> > > >>>>>>>>>>>>>>> __system_partition to the schema. This would be never
> > > visible
> > > >>>> to
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>> user as it would be used only for the subsequent
> computed
> > > >>>> columns.
> > > >>>>>>>> If
> > > >>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with
> this
> > > >>>>>>>> description.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state
> of
> > > the
> > > >>>>>>>> proposal.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 3. Partitioning on computed column vs function
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are
> > > >>>> orthogonal. I
> > > >>>>>>>> would
> > > >>>>>>>>>>>>>>> leave out the STORED computed columns out of the
> > > discussion.
> > > >> I
> > > >>>>>>>> don't see
> > > >>>>>>>>>>>>>>> how do they relate to the partitioning. I already put
> > both
> > > of
> > > >>>> those
> > > >>>>>>>>>>>>>>> cases in the document. We can either partition on a
> > > computed
> > > >>>>>>>> column or
> > > >>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with
> > leaving
> > > >> out
> > > >>>> the
> > > >>>>>>>>>>>>>>> partitioning by udf in the first version if you still
> > have
> > > >> some
> > > >>>>>>>>>>>>>> concerns.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> As for your question Danny. It depends which
> partitioning
> > > >>>> strategy
> > > >>>>>>>> you
> > > >>>>>>>>>>>>>> use.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would
> > work
> > > as
> > > >>>> you
> > > >>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not
> sure
> > > >>>> though if
> > > >>>>>>>> we
> > > >>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink
> > does
> > > >> not
> > > >>>> own
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>> data and the partitions are already an intrinsic
> property
> > > of
> > > >>>> the
> > > >>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create
> topics,
> > > but
> > > >>>> we
> > > >>>>>>>> just
> > > >>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs
> ...
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be
> > > >> consistent
> > > >>>> with
> > > >>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was
> also
> > > my
> > > >>>>>>>> initial
> > > >>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it
> > > afterwards
> > > >>>> to
> > > >>>>>>>> shorten
> > > >>>>>>>>>>>>>>> the key.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Dawid
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote:
> > > >>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think
> it
> > > is
> > > >> a
> > > >>>>>>>> useful
> > > >>>>>>>>>>>>>>> feature ~
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> About how the metadata outputs from source
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column
> > push
> > > >>>> down is
> > > >>>>>>>>>>>>>>> another topic, this should not be a blocker but a
> > > promotion,
> > > >>>> if we
> > > >>>>>>>> do
> > > >>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>> have any filters on the computed column, there is no
> need
> > > to
> > > >>>> do any
> > > >>>>>>>>>>>>>>> pushings; the source node just emit the complete record
> > > with
> > > >>>> full
> > > >>>>>>>>>>>>>> metadata
> > > >>>>>>>>>>>>>>> with the declared physical schema, then when generating
> > the
> > > >>>> virtual
> > > >>>>>>>>>>>>>>> columns, we would extract the metadata info and output
> as
> > > >> full
> > > >>>>>>>>>>>>>> columns(with
> > > >>>>>>>>>>>>>>> full schema).
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> About the type of metadata column
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST,
> they
> > > are
> > > >>>>>>>> symantic
> > > >>>>>>>>>>>>>>> equivalent though, explict type is more
> straight-forward
> > > and
> > > >>>> we can
> > > >>>>>>>>>>>>>> declare
> > > >>>>>>>>>>>>>>> the nullable attribute there.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column
> > VS
> > > >>>> option
> > > >>>>>>>> B:
> > > >>>>>>>>>>>>>>> partitioning with just a function
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>     From the FLIP, it seems that B's partitioning is
> > just
> > > a
> > > >>>> strategy
> > > >>>>>>>> when
> > > >>>>>>>>>>>>>>> writing data, the partiton column is not included in
> the
> > > >> table
> > > >>>>>>>> schema,
> > > >>>>>>>>>>>>>> so
> > > >>>>>>>>>>>>>>> it's just useless when reading from that.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the
> > partition
> > > >>>> column
> > > >>>>>>>> when
> > > >>>>>>>>>>>>>>> selecting from the table(but insert into)
> > > >>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we
> > want
> > > >> to
> > > >>>>>>>> persist
> > > >>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support
> > > both, i
> > > >>>> saw
> > > >>>>>>>> that
> > > >>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the
> > > >> PARTITIONS
> > > >>>>>>>> num, and
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the
> > > >> partition
> > > >>>> in
> > > >>>>>>>> which
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> record is stored is partition number N, where N =
> > MOD(expr,
> > > >>>> num),
> > > >>>>>>>> for
> > > >>>>>>>>>>>>>> your
> > > >>>>>>>>>>>>>>> design, which partiton the record would persist ?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>
> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> > > >>>>>>>>>>>>>>>> [2]
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>> Danny Chan
> > > >>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <
> > > >>>> dwysakow...@apache.org
> > > >>>>>>>>>>>>>>> ,写道:
> > > >>>>>>>>>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to
> FLIP-63
> > > >>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of
> > > >>>> properties.
> > > >>>>>>>>>>>>>>> Therefore you have the key.format.type.
> > > >>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting
> > > >> (prefixing
> > > >>>> with
> > > >>>>>>>>>>>>>>> connector or kafka). I should've put that into an
> > > >>>> Option/Rejected
> > > >>>>>>>>>>>>>>> alternatives.
> > > >>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector
> > > properties.
> > > >>>> Why I
> > > >>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first
> > > version
> > > >>>> is
> > > >>>>>>>> that
> > > >>>>>>>>>>>>>>> actually all the properties in the WITH section are
> > > connector
> > > >>>>>>>>>>>>>> properties.
> > > >>>>>>>>>>>>>>> Even format is in the end a connector property as some
> of
> > > the
> > > >>>>>>>> sources
> > > >>>>>>>>>>>>>> might
> > > >>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the
> > > prefix
> > > >> is
> > > >>>>>>>> that it
> > > >>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the
> > > >>>> properties
> > > >>>>>>>> with
> > > >>>>>>>>>>>>>>> connector (or if we go with FLINK-12557:
> elasticsearch):
> > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv
> > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.field: ....
> > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: ....
> > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.*: ....
> > > >>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred
> > > >>>> approach
> > > >>>>>>>> in the
> > > >>>>>>>>>>>>>>> community.
> > > >>>>>>>>>>>>>>>>> Ad in-line comments:
> > > >>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include`
> property.
> > > It
> > > >>>>>>>> should be
> > > >>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested
> in
> > > the
> > > >>>>>>>> comment,
> > > >>>>>>>>>>>>>>> right?
> > > >>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed
> > > >> column.
> > > >>>> I
> > > >>>>>>>> think
> > > >>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an
> > > >>>> expression
> > > >>>>>>>> and
> > > >>>>>>>>>>>>>> later
> > > >>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason
> > is
> > > I
> > > >>>> think
> > > >>>>>>>> this
> > > >>>>>>>>>>>>>> way
> > > >>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs
> > when
> > > >>>> working
> > > >>>>>>>> with
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's
> > > offset, i
> > > >>>>>>>> think it's
> > > >>>>>>>>>>>>>>> better to pushdown long rather than string. This could
> > let
> > > us
> > > >>>> push
> > > >>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382.
> > > >>>> Otherwise we
> > > >>>>>>>> would
> > > >>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 &&
> > > cast(offset,
> > > >>>> long)
> > > >>>>>>>> <
> > > >>>>>>>>>>>>>> 59382.
> > > >>>>>>>>>>>>>>> Moreover I think we need to introduce the type for
> > computed
> > > >>>> columns
> > > >>>>>>>>>>>>>> anyway
> > > >>>>>>>>>>>>>>> to support functions that infer output type based on
> > > expected
> > > >>>>>>>> return
> > > >>>>>>>>>>>>>> type.
> > > >>>>>>>>>>>>>>>>> As for the computed column push down. Yes,
> > > SYSTEM_METADATA
> > > >>>> would
> > > >>>>>>>> have
> > > >>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible
> > the
> > > >>>> planner
> > > >>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>> fail. As far as I know computed columns push down will
> be
> > > >> part
> > > >>>> of
> > > >>>>>>>> source
> > > >>>>>>>>>>>>>>> rework, won't it? ;)
> > > >>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is
> > > >>>> completely
> > > >>>>>>>>>>>>>>> orthogonal. In my current proposal you can also
> partition
> > > by
> > > >> a
> > > >>>>>>>> computed
> > > >>>>>>>>>>>>>>> column. The difference between using a udf in
> partitioned
> > > by
> > > >> vs
> > > >>>>>>>>>>>>>> partitioned
> > > >>>>>>>>>>>>>>> by a computed column is that when you partition by a
> > > computed
> > > >>>>>>>> column
> > > >>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>> column must be also computed when reading the table. If
> > you
> > > >>>> use a
> > > >>>>>>>> udf in
> > > >>>>>>>>>>>>>>> the partitioned by, the expression is computed only
> when
> > > >>>> inserting
> > > >>>>>>>> into
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> table.
> > > >>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking
> > forward
> > > >> for
> > > >>>>>>>> further
> > > >>>>>>>>>>>>>>> suggestions.
> > > >>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>> Dawid
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
> > > >>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion.
> > > Reaing
> > > >>>>>>>> metadata
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>> key-part information from source is an important
> > feature
> > > >> for
> > > >>>>>>>>>>>>>> streaming
> > > >>>>>>>>>>>>>>>>>> users.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP.
> > > >>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of
> > introducing
> > > >>>> HEADER
> > > >>>>>>>>>>>>>>> keyword as
> > > >>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP.
> > > >>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63.
> > > Maybe
> > > >> we
> > > >>>>>>>> should
> > > >>>>>>>>>>>>>>> add a
> > > >>>>>>>>>>>>>>>>>> section to explain what's the relationship between
> > them.
> > > >>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION
> be
> > > used
> > > >>>> on
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP?
> > > >>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink
> > SQL.
> > > >>>> Shall we
> > > >>>>>>>>>>>>>> make
> > > >>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> new introduced properties more hierarchical?
> > > >>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"?
> > > >>>> (actually, I
> > > >>>>>>>>>>>>>>> prefer
> > > >>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for
> > > >>>> properties
> > > >>>>>>>>>>>>>>> FLINK-12557)
> > > >>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users
> > > that
> > > >>>> the
> > > >>>>>>>>>>>>>> field
> > > >>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>>> a rowtime attribute.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
> > > >>>>>>>>>>>>>> dwysakow...@apache.org>
> > > >>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would
> > > enable
> > > >>>>>>>> reading
> > > >>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> columns from different parts of source records.
> > Besides
> > > >> the
> > > >>>>>>>> main
> > > >>>>>>>>>>>>>>> payload
> > > >>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose
> > additional
> > > >>>>>>>>>>>>>> information. It
> > > >>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset,
> > > >>>> ingestion
> > > >>>>>>>> time
> > > >>>>>>>>>>>>>> or a
> > > >>>>>>>>>>>>>>>>>>> read and write parts of the record that contain
> data
> > > but
> > > >>>>>>>>>>>>>> additionally
> > > >>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction
> > > etc.),
> > > >>>> e.g.
> > > >>>>>>>> key
> > > >>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>>> timestamp in Kafka.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> We should make it possible to read and write data
> > from
> > > >> all
> > > >>>> of
> > > >>>>>>>> those
> > > >>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading
> > > >> partitioning
> > > >>>>>>>> data,
> > > >>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the
> > > >> partitioning
> > > >>>> when
> > > >>>>>>>>>>>>>>> writing
> > > >>>>>>>>>>>>>>>>>>> data out.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I am looking forward to your comments.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> You can access the FLIP here:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Dawid
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to