Hi Ran,

If the metadata is from the message properties, then you can manually cast it 
to your preferred types, 
such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS TIMESTAMP)`.

If the metadata is not from the message properties, how does the connector know 
which field to convert from? 
Shouldn’t the connector be modified to support this new metadata column?

Best,
Jark



> 2022年8月26日 15:30,Ran Tao <chucheng...@gmail.com> 写道:
> 
> Hi, TiMo. I think using one map column in the debezium format you
> illustrated above can't cover the discussed scenario.
> It's not the same thing.
> 
> Here is a debezium format example from flink docs: [1]
> 
> ```
> CREATE TABLE KafkaTable (
>  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>  origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
>  user_id BIGINT,
> ) WITH (
>  'connector' = 'kafka',
>  'value.format' = 'debezium-json'
>  ...
> );
> ```
> 
> *the `origin_properties` is a column used for properties. So we define it
> at MAP *(just like you respond). But the other metadata columns have their
> own data types.
> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata columns
> within one MAP<STRING, STRING> column. it's not a good idea.
> 
> My suggestion is that if kafka above *add some new metadatas*(just for
> example, kafka maybe stable, but a certain connector or middleware might be
> developing, so its metadatas could be added or changed)
> e.g. at some time, kafka added a `host_name` metadata (indicate the address
> of message broker).
> 
> We can define sql like this:
> ```
> CREATE TABLE KafkaTable (
>  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>  host_name STRING METADATA VIRTUAL DYNAMIC,
>  origin_properties MAP<STRING, STRING> METADATA FROM
> 'value.source.properties' VIRTUAL,
>  user_id BIGINT,
> ) WITH (
>  'connector' = 'kafka',
>  'value.format' = 'debezium-json'
>  ...
> );
> ```
> Then users can use `host_name` this metadata, because it's a DYNAMIC
> metacolumn, flink dont't throw exception although `host_name`
> not belongs to kafka before, and the developers don't need to modify or
> rebuild flink source code and publish flink to online environment (it comes
> at a high cost).
> 
> Considering the return value:
> kafka before (no this metadata): null
> kafka now (added this metadata already): the concrete value
> 
> Same user sql works well in the past and now even in the future rather than
> check and deny these new metadata columns or modify connector
> implementation frequently to support it.
> And it's an option to configure by using 'DYNAMIC' at the metadata
> column(or other better implementations).
> 
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> 
> Timo Walther <twal...@apache.org> 于2022年8月25日周四 21:07写道:
> 
>> Hi Ran,
>> 
>> what would be the data type of this dynamic metadata column? The planner
>> and many parts of the stack will require a data type.
>> 
>> Personally, I feel connector developers can already have the same
>> functionality by declaring a metadata column as `MAP<STRING, STRING>`.
>> This is what we expose already as `debezium.source.properties`. Whatever
>> Debezium adds will be available through this property and can be
>> accessed via `SELECT col['my-new-property'] FROM x` including being NULL
>> be default if not present.
>> 
>> Regards,
>> Timo
>> 
>> 
>> On 25.08.22 14:04, Ran Tao wrote:
>>> ```
>>> create table test_source(
>>>  __test_metadata__ varchar METADATA,
>>>  f0 varchar,
>>>  f1 varchar,
>>>  f2 bigint,
>>>  ts as CURRENT_TIMESTAMP
>>> ) with(
>>>  'connector'='test',
>>>   ...
>>> )
>>> ```
>>> 
>>> If we not pre define `__test_metadata__` as meta keys by implementing
>>> listReadableMetadata, run the above sql, it will cause exception like
>> this:
>>> 
>>> org.apache.flink.table.api.ValidationException: Invalid metadata key
>>> '__test_metadata__' in column '__test_metadata__' of table
>>> 'default_catalog.default_database.test_source'. The DynamicTableSource
>>> class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
>>> supports the following metadata keys for reading:
>>> xxx, yyy
>>> 
>>> at
>>> 
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
>>> 
>>> Because the current flink metadata column must exist in results returned
>> by
>>> `listReadableMetadata`.  But when a certain connector adds some
>> metadatas,
>>> we can not use it directly unless we modify this connector code and
>> support
>>> it. In some situations, It can be intolerable. Can we support 'DYNAMIC
>>> MetadataColumn'?  Its basic mechanism is not to check a column with
>>> existing metadatas and users can define it dynamically. If a certain
>>> connector without this metadata, the column value will return null
>>> otherwise return it's concrete value. It has great benefits in some
>>> scenarios.
>>> 
>>> Looking forward to your opinions.
>>> 
>>> 
>> 
>> 
> 
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92

Reply via email to