Hi everyone,

I think we have a conclusion that the writable metadata shouldn't be
defined as a computed column, but a normal column.

"timestamp STRING SYSTEM_METADATA('timestamp')" is one of the approaches.
However, it is not SQL standard compliant, we need to be cautious enough
when adding new syntax.
Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
resolve the query-sink schema problem if it is read-only metadata. That
adds more stuff to learn for users.

>From my point of view, the "timestamp", "headers" are something like "key"
and "value" that stores with the real data. So why not define the
"timestamp" in the same way with "key" by using a "timestamp.field"
connector option?
On the other side, the read-only metadata, such as "offset", shouldn't be
defined as a normal column. So why not use the existing computed column
syntax for such metadata? Then we don't have the query-sink schema problem.
So here is my proposal:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  col1 STRING,
  col2 STRING,
  ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts is a normal field, so can
be read and written.
  offset AS SYSTEM_METADATA("offset")
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'key.fields' = 'id, name',
  'key.format' = 'csv',
  'value.format' = 'avro',
  'timestamp.field' = 'ts'    -- define the mapping of Kafka timestamp
);

INSERT INTO kafka_table
SELECT id, name, col1, col2, rowtime FROM another_table;

I think this can solve all the problems without introducing any new syntax.
The only minor disadvantage is that we separate the definition way/syntax
of read-only metadata and read-write fields.
However, I don't think this is a big problem.

Best,
Jark


On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org> wrote:

> Hi Kurt,
>
> thanks for sharing your opinion. I'm totally up for not reusing computed
> columns. I think Jark was a big supporter of this syntax, @Jark are you
> fine with this as well? The non-computed column approach was only a
> "slightly rejected alternative".
>
> Furthermore, we would need to think about how such a new design
> influences the LIKE clause though.
>
> However, we should still keep the `PERSISTED` keyword as it influences
> the query->sink schema. If you look at the list of metadata for existing
> connectors and formats, we currently offer only two writable metadata
> fields. Otherwise, one would need to declare two tables whenever a
> metadata columns is read (one for the source, one for the sink). This
> can be quite inconvientient e.g. for just reading the topic.
>
> Regards,
> Timo
>
>
> On 09.09.20 08:52, Kurt Young wrote:
> > I also share the concern that reusing the computed column syntax but have
> > different semantics
> > would confuse users a lot.
> >
> > Besides, I think metadata fields are conceptually not the same with
> > computed columns. The metadata
> > field is a connector specific thing and it only contains the information
> > that where does the field come
> > from (during source) or where does the field need to write to (during
> > sink). It's more similar with normal
> > fields, with assumption that all these fields need going to the data
> part.
> >
> > Thus I'm more lean to the rejected alternative that Timo mentioned. And I
> > think we don't need the
> > PERSISTED keyword, SYSTEM_METADATA should be enough.
> >
> > During implementation, the framework only needs to pass such <field,
> > metadata field> information to the
> > connector, and the logic of handling such fields inside the connector
> > should be straightforward.
> >
> > Regarding the downside Timo mentioned:
> >
> >> The disadvantage is that users cannot call UDFs or parse timestamps.
> >
> > I think this is fairly simple to solve. Since the metadata field isn't a
> > computed column anymore, we can support
> > referencing such fields in the computed column. For example:
> >
> > CREATE TABLE kafka_table (
> >       id BIGINT,
> >       name STRING,
> >       timestamp STRING SYSTEM_METADATA("timestamp"),  // get the
> timestamp
> > field from metadata
> >       ts AS to_timestamp(timestamp) // normal computed column, parse the
> > string to TIMESTAMP type by using the metadata field
> > ) WITH (
> >      ...
> > )
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Leonard,
> >>
> >> the only alternative I see is that we introduce a concept that is
> >> completely different to computed columns. This is also mentioned in the
> >> rejected alternative section of the FLIP. Something like:
> >>
> >> CREATE TABLE kafka_table (
> >>       id BIGINT,
> >>       name STRING,
> >>       timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
> >>       headers MAP<STRING, BYTES> SYSTEM_METADATA("headers") PERSISTED
> >> ) WITH (
> >>      ...
> >> )
> >>
> >> This way we would avoid confusion at all and can easily map columns to
> >> metadata columns. The disadvantage is that users cannot call UDFs or
> >> parse timestamps. This would need to be done in a real computed column.
> >>
> >> I'm happy about better alternatives.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 08.09.20 15:37, Leonard Xu wrote:
> >>> HI, Timo
> >>>
> >>> Thanks for driving this FLIP.
> >>>
> >>> Sorry but I have a concern about Writing metadata via DynamicTableSink
> >> section:
> >>>
> >>> CREATE TABLE kafka_table (
> >>>     id BIGINT,
> >>>     name STRING,
> >>>     timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT)
> PERSISTED,
> >>>     headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>)
> >> PERSISTED
> >>> ) WITH (
> >>>     ...
> >>> )
> >>> An insert statement could look like:
> >>>
> >>> INSERT INTO kafka_table VALUES (
> >>>     (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
> >>> )
> >>>
> >>> The proposed INERT syntax does not make sense to me, because it
> contains
> >> computed(generated) column.
> >>> Both SQL server and Postgresql do not allow to insert value to computed
> >> columns even they are persisted, this boke the generated column
> semantics
> >> and may confuse user much.
> >>>
> >>> For SQL server computed column[1]:
> >>>> column_name AS computed_column_expression [ PERSISTED [ NOT NULL ]
> ]...
> >>>> NOTE: A computed column cannot be the target of an INSERT or UPDATE
> >> statement.
> >>>
> >>> For Postgresql generated column[2]:
> >>>>    height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED
> >>>> NOTE: A generated column cannot be written to directly. In INSERT or
> >> UPDATE commands, a value cannot be specified for a generated column, but
> >> the keyword DEFAULT may be specified.
> >>>
> >>> It shouldn't be allowed to set/update value for generated column after
> >> lookup the SQL 2016:
> >>>> <insert statement> ::=
> >>>> INSERT INTO <insertion target> <insert columns and source>
> >>>>
> >>>> If <contextually typed table value constructor> CTTVC is specified,
> >> then every <contextually typed row
> >>>> value constructor element> simply contained in CTTVC whose
> positionally
> >> corresponding <column name>
> >>>> in <insert column list> references a column of which some underlying
> >> column is a generated column shall
> >>>> be a <default specification>.
> >>>> A <default specification> specifies the default value of some
> >> associated item.
> >>>
> >>>
> >>> [1]
> >>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >> <
> >>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >>>
> >>> [2] https://www.postgresql.org/docs/12/ddl-generated-columns.html <
> >> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> >>>
> >>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道:
> >>>>
> >>>> Hi Jark,
> >>>>
> >>>> according to Flink's and Calcite's casting definition in [1][2]
> >> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If not,
> we
> >> will make it possible ;-)
> >>>>
> >>>> I'm aware of DeserializationSchema.getProducedType but I think that
> >> this method is actually misplaced. The type should rather be passed to
> the
> >> source itself.
> >>>>
> >>>> For our Kafka SQL source, we will also not use this method because the
> >> Kafka source will add own metadata in addition to the
> >> DeserializationSchema. So DeserializationSchema.getProducedType will
> never
> >> be read.
> >>>>
> >>>> For now I suggest to leave out the `DataType` from
> >> DecodingFormat.applyReadableMetadata. Also because the format's physical
> >> type is passed later in `createRuntimeDecoder`. If necessary, it can be
> >> computed manually by consumedType + metadata types. We will provide a
> >> metadata utility class for that.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> [1]
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> >>>> [2]
> >>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> >>>>
> >>>>
> >>>> On 08.09.20 10:52, Jark Wu wrote:
> >>>>> Hi Timo,
> >>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I just
> >> noticed
> >>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME
> >> ZONE".
> >>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL
> TIME
> >>>>> ZONE" as the defined type of Kafka timestamp? I think this makes
> sense,
> >>>>> because it represents the milli-seconds since epoch.
> >>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I don't
> think
> >> so.
> >>>>> The DeserializationSchema implements ResultTypeQueryable, thus the
> >>>>> implementation needs to return an output TypeInfo.
> >>>>> Besides, FlinkKafkaConsumer also
> >>>>> calls DeserializationSchema.getProducedType as the produced type of
> the
> >>>>> source function [1].
> >>>>> Best,
> >>>>> Jark
> >>>>> [1]:
> >>>>>
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> >>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org>
> wrote:
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> I updated the FLIP again and hope that I could address the mentioned
> >>>>>> concerns.
> >>>>>>
> >>>>>> @Leonard: Thanks for the explanation. I wasn't aware that ts_ms and
> >>>>>> source.ts_ms have different semantics. I updated the FLIP and expose
> >> the
> >>>>>> most commonly used properties separately. So frequently used
> >> properties
> >>>>>> are not hidden in the MAP anymore:
> >>>>>>
> >>>>>> debezium-json.ingestion-timestamp
> >>>>>> debezium-json.source.timestamp
> >>>>>> debezium-json.source.database
> >>>>>> debezium-json.source.schema
> >>>>>> debezium-json.source.table
> >>>>>>
> >>>>>> However, since other properties depend on the used connector/vendor,
> >> the
> >>>>>> remaining options are stored in:
> >>>>>>
> >>>>>> debezium-json.source.properties
> >>>>>>
> >>>>>> And accessed with:
> >>>>>>
> >>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
> MAP<STRING,
> >>>>>> STRING>)['table']
> >>>>>>
> >>>>>> Otherwise it is not possible to figure out the value and column type
> >>>>>> during validation.
> >>>>>>
> >>>>>> @Jark: You convinced me in relaxing the CAST constraints. I added a
> >>>>>> dedicacated sub-section to the FLIP:
> >>>>>>
> >>>>>> For making the use of SYSTEM_METADATA easier and avoid nested
> casting
> >> we
> >>>>>> allow explicit casting to a target data type:
> >>>>>>
> >>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) WITH
> >> LOCAL
> >>>>>> TIME ZONE)
> >>>>>>
> >>>>>> A connector still produces and consumes the data type returned by
> >>>>>> `listMetadata()`. The planner will insert necessary explicit casts.
> >>>>>>
> >>>>>> In any case, the user must provide a CAST such that the computed
> >> column
> >>>>>> receives a valid data type when constructing the table schema.
> >>>>>>
> >>>>>> "I don't see a reason why `DecodingFormat#applyReadableMetadata`
> >> needs a
> >>>>>> DataType argument."
> >>>>>>
> >>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is always
> >>>>>> executed locally. It is the source that needs TypeInfo for
> serializing
> >>>>>> the record to the next operator. And that's this is what we provide.
> >>>>>>
> >>>>>> @Danny:
> >>>>>>
> >>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default”
> >>>>>>
> >>>>>> We can also use some other means to represent an UNKNOWN data type.
> In
> >>>>>> the Flink type system, we use the NullType for it. The important
> part
> >> is
> >>>>>> that the final data type is known for the entire computed column.
> As I
> >>>>>> mentioned before, I would avoid the suggested option b) that would
> be
> >>>>>> similar to your suggestion. The CAST should be enough and allows for
> >>>>>> complex expressions in the computed column. Option b) would need
> >> parser
> >>>>>> changes.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 08.09.20 06:21, Leonard Xu wrote:
> >>>>>>> Hi, Timo
> >>>>>>>
> >>>>>>> Thanks for you explanation and update,  I have only one question
> for
> >>>>>> the latest FLIP.
> >>>>>>>
> >>>>>>> About the MAP<STRING, STRING> DataType of key
> >> 'debezium-json.source', if
> >>>>>> user want to use the table name metadata, they need to write:
> >>>>>>> tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') AS
> >>>>>> MAP<STRING, STRING>)['table']
> >>>>>>>
> >>>>>>> the expression is a little complex for user, Could we only support
> >>>>>> necessary metas with simple DataType as following?
> >>>>>>> tableName STRING AS
> >> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> >>>>>> STRING),
> >>>>>>> transactionTime LONG AS
> >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> >>>>>>>
> >>>>>>> In this way, we can simplify the expression, the mainly used
> >> metadata in
> >>>>>> changelog format may include
> >> 'database','table','source.ts_ms','ts_ms' from
> >>>>>> my side,
> >>>>>>> maybe we could only support them at first version.
> >>>>>>>
> >>>>>>> Both Debezium and Canal have above four metadata, and I‘m willing
> to
> >>>>>> take some subtasks in next development if necessary.
> >>>>>>>
> >>>>>>> Debezium:
> >>>>>>> {
> >>>>>>>      "before": null,
> >>>>>>>      "after": {  "id": 101,"name": "scooter"},
> >>>>>>>      "source": {
> >>>>>>>        "db": "inventory",                  # 1. database name the
> >>>>>> changelog belongs to.
> >>>>>>>        "table": "products",                # 2. table name the
> >> changelog
> >>>>>> belongs to.
> >>>>>>>        "ts_ms": 1589355504100,             # 3. timestamp of the
> >> change
> >>>>>> happened in database system, i.e.: transaction time in database.
> >>>>>>>        "connector": "mysql",
> >>>>>>>        ….
> >>>>>>>      },
> >>>>>>>      "ts_ms": 1589355606100,              # 4. timestamp when the
> >> debezium
> >>>>>> processed the changelog.
> >>>>>>>      "op": "c",
> >>>>>>>      "transaction": null
> >>>>>>> }
> >>>>>>>
> >>>>>>> Canal:
> >>>>>>> {
> >>>>>>>      "data": [{  "id": "102", "name": "car battery" }],
> >>>>>>>      "database": "inventory",      # 1. database name the changelog
> >>>>>> belongs to.
> >>>>>>>      "table": "products",          # 2. table name the changelog
> >> belongs
> >>>>>> to.
> >>>>>>>      "es": 1589374013000,          # 3. execution time of the
> change
> >> in
> >>>>>> database system, i.e.: transaction time in database.
> >>>>>>>      "ts": 1589374013680,          # 4. timestamp when the cannal
> >>>>>> processed the changelog.
> >>>>>>>      "isDdl": false,
> >>>>>>>      "mysqlType": {},
> >>>>>>>      ....
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> Best
> >>>>>>> Leonard
> >>>>>>>
> >>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道:
> >>>>>>>>
> >>>>>>>> Thanks Timo ~
> >>>>>>>>
> >>>>>>>> The FLIP was already in pretty good shape, I have only 2 questions
> >> here:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid
> >> read-only
> >>>>>> computed column for Kafka and can be extracted by the planner.”
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> What is the pros we follow the SQL-SERVER syntax here ? Usually an
> >>>>>> expression return type can be inferred automatically. But I guess
> >>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which
> actually
> >> does
> >>>>>> not have a specific return type.
> >>>>>>>>
> >>>>>>>> And why not use the Oracle or MySQL syntax there ?
> >>>>>>>>
> >>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression)
> [VIRTUAL]
> >>>>>>>> Which is more straight-forward.
> >>>>>>>>
> >>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by default”
> >>>>>>>>
> >>>>>>>> The default type should not be NULL because only NULL literal does
> >>>>>> that. Usually we use ANY as the type if we do not know the specific
> >> type in
> >>>>>> the SQL context. ANY means the physical value can be any java
> object.
> >>>>>>>>
> >>>>>>>> [1] https://oracle-base.com/articles/11g/virtual-columns-11gr1
> >>>>>>>> [2]
> >>>>>>
> >>
> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Danny Chan
> >>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org>,写道:
> >>>>>>>>> Hi everyone,
> >>>>>>>>>
> >>>>>>>>> I completely reworked FLIP-107. It now covers the full story how
> to
> >>>>>> read
> >>>>>>>>> and write metadata from/to connectors and formats. It considers
> >> all of
> >>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> >> introduces
> >>>>>>>>> the concept of PERSISTED computed columns and leaves out
> >> partitioning
> >>>>>>>>> for now.
> >>>>>>>>>
> >>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 04.03.20 09:45, Kurt Young wrote:
> >>>>>>>>>> Sorry, forgot one question.
> >>>>>>>>>>
> >>>>>>>>>> 4. Can we make the value.fields-include more orthogonal? Like
> one
> >> can
> >>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
> >>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not
> >>>>>> config to
> >>>>>>>>>> just ignore timestamp but keep key.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Kurt
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com>
> >> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Dawid,
> >>>>>>>>>>>
> >>>>>>>>>>> I have a couple of questions around key fields, actually I also
> >> have
> >>>>>> some
> >>>>>>>>>>> other questions but want to be focused on key fields first.
> >>>>>>>>>>>
> >>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is this
> >>>>>> option only
> >>>>>>>>>>> valid during write operation? Because for
> >>>>>>>>>>> reading, I can't imagine how such options can be applied. I
> would
> >>>>>> expect
> >>>>>>>>>>> that there might be a SYSTEM_METADATA("key")
> >>>>>>>>>>> to read and assign the key to a normal field?
> >>>>>>>>>>>
> >>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I want to
> >>>>>> propose we
> >>>>>>>>>>> can simplify the options to not introducing key.format.type and
> >>>>>>>>>>> other related options. I think a single "key.field" (not
> fields)
> >>>>>> would be
> >>>>>>>>>>> enough, users can use UDF to calculate whatever key they
> >>>>>>>>>>> want before sink.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and
> >>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every connector
> >> has a
> >>>>>>>>>>> concept
> >>>>>>>>>>> of key and values. The old parameter "format.type" already good
> >>>>>> enough to
> >>>>>>>>>>> use.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Kurt
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com>
> >> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks Dawid,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have two more questions.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> SupportsMetadata
> >>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I have
> some
> >>>>>> questions
> >>>>>>>>>>>> regarding to this interface.
> >>>>>>>>>>>> 1) How do the source know what the expected return type of
> each
> >>>>>> metadata?
> >>>>>>>>>>>> 2) Where to put the metadata fields? Append to the existing
> >> physical
> >>>>>>>>>>>> fields?
> >>>>>>>>>>>> If yes, I would suggest to change the signature to
> `TableSource
> >>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[]
> >>>>>> metadataTypes)`
> >>>>>>>>>>>>
> >>>>>>>>>>>>> SYSTEM_METADATA("partition")
> >>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a computed
> >> column
> >>>>>>>>>>>> expression? If yes, how to specify the return type of
> >>>>>> SYSTEM_METADATA?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Jark
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
> >>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. I thought a bit more on how the source would emit the
> >> columns
> >>>>>> and I
> >>>>>>>>>>>>> now see its not exactly the same as regular columns. I see a
> >> need
> >>>>>> to
> >>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked, Jark.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I do agree mostly with Danny on how we should do that. One
> >>>>>> additional
> >>>>>>>>>>>>> things I would introduce is an
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> interface SupportsMetadata {
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> TableSource generateMetadataFields(Set<String>
> metadataFields);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This way the source would have to declare/emit only the
> >> requested
> >>>>>>>>>>>>> metadata fields. In order not to clash with user defined
> >> fields.
> >>>>>> When
> >>>>>>>>>>>>> emitting the metadata field I would prepend the column name
> >> with
> >>>>>>>>>>>>> __system_{property_name}. Therefore when requested
> >>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a field
> >>>>>>>>>>>>> __system_partition to the schema. This would be never visible
> >> to
> >>>>>> the
> >>>>>>>>>>>>> user as it would be used only for the subsequent computed
> >> columns.
> >>>>>> If
> >>>>>>>>>>>>> that makes sense to you, I will update the FLIP with this
> >>>>>> description.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. CAST vs explicit type in computed columns
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Here I agree with Danny. It is also the current state of the
> >>>>>> proposal.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. Partitioning on computed column vs function
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Here I also agree with Danny. I also think those are
> >> orthogonal. I
> >>>>>> would
> >>>>>>>>>>>>> leave out the STORED computed columns out of the discussion.
> I
> >>>>>> don't see
> >>>>>>>>>>>>> how do they relate to the partitioning. I already put both of
> >> those
> >>>>>>>>>>>>> cases in the document. We can either partition on a computed
> >>>>>> column or
> >>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with leaving
> out
> >> the
> >>>>>>>>>>>>> partitioning by udf in the first version if you still have
> some
> >>>>>>>>>>>> concerns.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As for your question Danny. It depends which partitioning
> >> strategy
> >>>>>> you
> >>>>>>>>>>>> use.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For the HASH partitioning strategy I thought it would work as
> >> you
> >>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not sure
> >> though if
> >>>>>> we
> >>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink does
> not
> >> own
> >>>>>> the
> >>>>>>>>>>>>> data and the partitions are already an intrinsic property of
> >> the
> >>>>>>>>>>>>> underlying source e.g. for kafka we do not create topics, but
> >> we
> >>>>>> just
> >>>>>>>>>>>>> describe pre-existing pre-partitioned topic.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs ...
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am fine with changing it to timestamp.field to be
> consistent
> >> with
> >>>>>>>>>>>>> other value.fields and key.fields. Actually that was also my
> >>>>>> initial
> >>>>>>>>>>>>> proposal in a first draft I prepared. I changed it afterwards
> >> to
> >>>>>> shorten
> >>>>>>>>>>>>> the key.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote:
> >>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think it is
> a
> >>>>>> useful
> >>>>>>>>>>>>> feature ~
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About how the metadata outputs from source
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think it is completely orthogonal, computed column push
> >> down is
> >>>>>>>>>>>>> another topic, this should not be a blocker but a promotion,
> >> if we
> >>>>>> do
> >>>>>>>>>>>> not
> >>>>>>>>>>>>> have any filters on the computed column, there is no need to
> >> do any
> >>>>>>>>>>>>> pushings; the source node just emit the complete record with
> >> full
> >>>>>>>>>>>> metadata
> >>>>>>>>>>>>> with the declared physical schema, then when generating the
> >> virtual
> >>>>>>>>>>>>> columns, we would extract the metadata info and output as
> full
> >>>>>>>>>>>> columns(with
> >>>>>>>>>>>>> full schema).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About the type of metadata column
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST, they are
> >>>>>> symantic
> >>>>>>>>>>>>> equivalent though, explict type is more straight-forward and
> >> we can
> >>>>>>>>>>>> declare
> >>>>>>>>>>>>> the nullable attribute there.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About option A: partitioning based on acomputed column VS
> >> option
> >>>>>> B:
> >>>>>>>>>>>>> partitioning with just a function
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>    From the FLIP, it seems that B's partitioning is just a
> >> strategy
> >>>>>> when
> >>>>>>>>>>>>> writing data, the partiton column is not included in the
> table
> >>>>>> schema,
> >>>>>>>>>>>> so
> >>>>>>>>>>>>> it's just useless when reading from that.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Compared to A, we do not need to generate the partition
> >> column
> >>>>>> when
> >>>>>>>>>>>>> selecting from the table(but insert into)
> >>>>>>>>>>>>>> - For A we can also mark the column as STORED when we want
> to
> >>>>>> persist
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So in my opition they are orthogonal, we can support both, i
> >> saw
> >>>>>> that
> >>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the
> PARTITIONS
> >>>>>> num, and
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> partitions are managed under a "tablenamespace", the
> partition
> >> in
> >>>>>> which
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> record is stored is partition number N, where N = MOD(expr,
> >> num),
> >>>>>> for
> >>>>>>>>>>>> your
> >>>>>>>>>>>>> design, which partiton the record would persist ?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> >>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>
> >>
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <
> >> dwysakow...@apache.org
> >>>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63
> >>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of
> >> properties.
> >>>>>>>>>>>>> Therefore you have the key.format.type.
> >>>>>>>>>>>>>>> I also considered exactly what you are suggesting
> (prefixing
> >> with
> >>>>>>>>>>>>> connector or kafka). I should've put that into an
> >> Option/Rejected
> >>>>>>>>>>>>> alternatives.
> >>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector properties.
> >> Why I
> >>>>>>>>>>>>> wanted to suggest not adding that prefix in the first version
> >> is
> >>>>>> that
> >>>>>>>>>>>>> actually all the properties in the WITH section are connector
> >>>>>>>>>>>> properties.
> >>>>>>>>>>>>> Even format is in the end a connector property as some of the
> >>>>>> sources
> >>>>>>>>>>>> might
> >>>>>>>>>>>>> not have a format, imo. The benefit of not adding the prefix
> is
> >>>>>> that it
> >>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the
> >> properties
> >>>>>> with
> >>>>>>>>>>>>> connector (or if we go with FLINK-12557: elasticsearch):
> >>>>>>>>>>>>>>> elasticsearch.key.format.type: csv
> >>>>>>>>>>>>>>> elasticsearch.key.format.field: ....
> >>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: ....
> >>>>>>>>>>>>>>> elasticsearch.key.format.*: ....
> >>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred
> >> approach
> >>>>>> in the
> >>>>>>>>>>>>> community.
> >>>>>>>>>>>>>>> Ad in-line comments:
> >>>>>>>>>>>>>>> I forgot to update the `value.fields.include` property. It
> >>>>>> should be
> >>>>>>>>>>>>> value.fields-include. Which I think you also suggested in the
> >>>>>> comment,
> >>>>>>>>>>>>> right?
> >>>>>>>>>>>>>>> As for the cast vs declaring output type of computed
> column.
> >> I
> >>>>>> think
> >>>>>>>>>>>>> it's better not to use CAST, but declare a type of an
> >> expression
> >>>>>> and
> >>>>>>>>>>>> later
> >>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason is I
> >> think
> >>>>>> this
> >>>>>>>>>>>> way
> >>>>>>>>>>>>> it will be easier to implement e.g. filter push downs when
> >> working
> >>>>>> with
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's offset, i
> >>>>>> think it's
> >>>>>>>>>>>>> better to pushdown long rather than string. This could let us
> >> push
> >>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382.
> >> Otherwise we
> >>>>>> would
> >>>>>>>>>>>>> have to push down cast(offset, long) > 12345 && cast(offset,
> >> long)
> >>>>>> <
> >>>>>>>>>>>> 59382.
> >>>>>>>>>>>>> Moreover I think we need to introduce the type for computed
> >> columns
> >>>>>>>>>>>> anyway
> >>>>>>>>>>>>> to support functions that infer output type based on expected
> >>>>>> return
> >>>>>>>>>>>> type.
> >>>>>>>>>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA
> >> would
> >>>>>> have
> >>>>>>>>>>>>> to be pushed down to the source. If it is not possible the
> >> planner
> >>>>>>>>>>>> should
> >>>>>>>>>>>>> fail. As far as I know computed columns push down will be
> part
> >> of
> >>>>>> source
> >>>>>>>>>>>>> rework, won't it? ;)
> >>>>>>>>>>>>>>> As for the persisted computed column. I think it is
> >> completely
> >>>>>>>>>>>>> orthogonal. In my current proposal you can also partition by
> a
> >>>>>> computed
> >>>>>>>>>>>>> column. The difference between using a udf in partitioned by
> vs
> >>>>>>>>>>>> partitioned
> >>>>>>>>>>>>> by a computed column is that when you partition by a computed
> >>>>>> column
> >>>>>>>>>>>> this
> >>>>>>>>>>>>> column must be also computed when reading the table. If you
> >> use a
> >>>>>> udf in
> >>>>>>>>>>>>> the partitioned by, the expression is computed only when
> >> inserting
> >>>>>> into
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> table.
> >>>>>>>>>>>>>>> Hope this answers some of your questions. Looking forward
> for
> >>>>>> further
> >>>>>>>>>>>>> suggestions.
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
> >>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing
> >>>>>> metadata
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> key-part information from source is an important feature
> for
> >>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>> users.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP.
> >>>>>>>>>>>>>>>> I will leave my thoughts and comments here:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of introducing
> >> HEADER
> >>>>>>>>>>>>> keyword as
> >>>>>>>>>>>>>>>> the reason you mentioned in the FLIP.
> >>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe
> we
> >>>>>> should
> >>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>> section to explain what's the relationship between them.
> >>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used
> >> on
> >>>>>> the
> >>>>>>>>>>>>>>>> PARTITIONED table in this FLIP?
> >>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL.
> >> Shall we
> >>>>>>>>>>>> make
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> new introduced properties more hierarchical?
> >>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"?
> >> (actually, I
> >>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for
> >> properties
> >>>>>>>>>>>>> FLINK-12557)
> >>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users that
> >> the
> >>>>>>>>>>>> field
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> a rowtime attribute.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also left some minor comments in the FLIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
> >>>>>>>>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I would like to propose an improvement that would enable
> >>>>>> reading
> >>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> columns from different parts of source records. Besides
> the
> >>>>>> main
> >>>>>>>>>>>>> payload
> >>>>>>>>>>>>>>>>> majority (if not all of the sources) expose additional
> >>>>>>>>>>>> information. It
> >>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset,
> >> ingestion
> >>>>>> time
> >>>>>>>>>>>> or a
> >>>>>>>>>>>>>>>>> read and write parts of the record that contain data but
> >>>>>>>>>>>> additionally
> >>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction etc.),
> >> e.g.
> >>>>>> key
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> timestamp in Kafka.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We should make it possible to read and write data from
> all
> >> of
> >>>>>> those
> >>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading
> partitioning
> >>>>>> data,
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> completeness this proposal discusses also the
> partitioning
> >> when
> >>>>>>>>>>>>> writing
> >>>>>>>>>>>>>>>>> data out.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I am looking forward to your comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> You can access the FLIP here:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >
>
>

Reply via email to