I'm wishing we had column DEFAULTs as was discussed in this thread way back
in 2022.

I found FLIP-261 [1], but it doesn't appear to have been voted on or
implemented. Is that correct?

[1] https://cwiki.apache.org/confluence/x/x4ueDQ

Regards,
David

On Thu, Sep 1, 2022 at 1:32 AM Ran Tao <[email protected]> wrote:

> Hi, Jark & Timo. I'm glad to support this feature, and if you guys agree,
> I'll be ready to create a FLIP, and then you guys and other developers can
> review and check some specifics.
>
> Thanks.
>
> Jark Wu <[email protected]> 于2022年8月30日周二 20:24写道:
>
> > Thank you Ran for the explanation.
> >
> > The column DEFAULT is a reasonable feature and can also help in other
> > cases.
> > I’m fine with adding this feature.
> > Do you want to prepare a FLIP for it?
> >
> > Best,
> > Jark
> >
> > > 2022年8月29日 15:02,Ran Tao <[email protected]> 写道:
> > >
> > > Hi Jack. Timo summed it up very well. In fact, my problem is that the
> > > current flink table metadata is fixed and cannot be compatible with the
> > > connector's changes in metadata columns.
> > > A metadata column that did not exist in the past, does exist at some
> > point
> > > in the future, and vice versa.
> > > There is forward and backward compatibility here.
> > >
> > > Jark Wu <[email protected]> 于2022年8月26日周五 16:28写道:
> > >
> > >> Hi Ran,
> > >>
> > >> If the metadata is from the message properties, then you can manually
> > cast
> > >> it to your preferred types,
> > >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> > >> TIMESTAMP)`.
> > >>
> > >> If the metadata is not from the message properties, how does the
> > connector
> > >> know which field to convert from?
> > >> Shouldn’t the connector be modified to support this new metadata
> column?
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >>> 2022年8月26日 15:30,Ran Tao <[email protected]> 写道:
> > >>>
> > >>> Hi, TiMo. I think using one map column in the debezium format you
> > >>> illustrated above can't cover the discussed scenario.
> > >>> It's not the same thing.
> > >>>
> > >>> Here is a debezium format example from flink docs: [1]
> > >>>
> > >>> ```
> > >>> CREATE TABLE KafkaTable (
> > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> > >> VIRTUAL,
> > >>> origin_properties MAP<STRING, STRING> METADATA FROM
> > >>> 'value.source.properties' VIRTUAL,
> > >>> user_id BIGINT,
> > >>> ) WITH (
> > >>> 'connector' = 'kafka',
> > >>> 'value.format' = 'debezium-json'
> > >>> ...
> > >>> );
> > >>> ```
> > >>>
> > >>> *the `origin_properties` is a column used for properties. So we
> define
> > it
> > >>> at MAP *(just like you respond). But the other metadata columns have
> > >> their
> > >>> own data types.
> > >>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata
> columns
> > >>> within one MAP<STRING, STRING> column. it's not a good idea.
> > >>>
> > >>> My suggestion is that if kafka above *add some new metadatas*(just
> for
> > >>> example, kafka maybe stable, but a certain connector or middleware
> > might
> > >> be
> > >>> developing, so its metadatas could be added or changed)
> > >>> e.g. at some time, kafka added a `host_name` metadata (indicate the
> > >> address
> > >>> of message broker).
> > >>>
> > >>> We can define sql like this:
> > >>> ```
> > >>> CREATE TABLE KafkaTable (
> > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> > >> VIRTUAL,
> > >>> host_name STRING METADATA VIRTUAL DYNAMIC,
> > >>> origin_properties MAP<STRING, STRING> METADATA FROM
> > >>> 'value.source.properties' VIRTUAL,
> > >>> user_id BIGINT,
> > >>> ) WITH (
> > >>> 'connector' = 'kafka',
> > >>> 'value.format' = 'debezium-json'
> > >>> ...
> > >>> );
> > >>> ```
> > >>> Then users can use `host_name` this metadata, because it's a DYNAMIC
> > >>> metacolumn, flink dont't throw exception although `host_name`
> > >>> not belongs to kafka before, and the developers don't need to modify
> or
> > >>> rebuild flink source code and publish flink to online environment (it
> > >> comes
> > >>> at a high cost).
> > >>>
> > >>> Considering the return value:
> > >>> kafka before (no this metadata): null
> > >>> kafka now (added this metadata already): the concrete value
> > >>>
> > >>> Same user sql works well in the past and now even in the future
> rather
> > >> than
> > >>> check and deny these new metadata columns or modify connector
> > >>> implementation frequently to support it.
> > >>> And it's an option to configure by using 'DYNAMIC' at the metadata
> > >>> column(or other better implementations).
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> > >>>
> > >>> Timo Walther <[email protected]> 于2022年8月25日周四 21:07写道:
> > >>>
> > >>>> Hi Ran,
> > >>>>
> > >>>> what would be the data type of this dynamic metadata column? The
> > planner
> > >>>> and many parts of the stack will require a data type.
> > >>>>
> > >>>> Personally, I feel connector developers can already have the same
> > >>>> functionality by declaring a metadata column as `MAP<STRING,
> STRING>`.
> > >>>> This is what we expose already as `debezium.source.properties`.
> > Whatever
> > >>>> Debezium adds will be available through this property and can be
> > >>>> accessed via `SELECT col['my-new-property'] FROM x` including being
> > NULL
> > >>>> be default if not present.
> > >>>>
> > >>>> Regards,
> > >>>> Timo
> > >>>>
> > >>>>
> > >>>> On 25.08.22 14:04, Ran Tao wrote:
> > >>>>> ```
> > >>>>> create table test_source(
> > >>>>> __test_metadata__ varchar METADATA,
> > >>>>> f0 varchar,
> > >>>>> f1 varchar,
> > >>>>> f2 bigint,
> > >>>>> ts as CURRENT_TIMESTAMP
> > >>>>> ) with(
> > >>>>> 'connector'='test',
> > >>>>>  ...
> > >>>>> )
> > >>>>> ```
> > >>>>>
> > >>>>> If we not pre define `__test_metadata__` as meta keys by
> implementing
> > >>>>> listReadableMetadata, run the above sql, it will cause exception
> like
> > >>>> this:
> > >>>>>
> > >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata
> key
> > >>>>> '__test_metadata__' in column '__test_metadata__' of table
> > >>>>> 'default_catalog.default_database.test_source'. The
> > DynamicTableSource
> > >>>>> class
> > 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> > >>>>> supports the following metadata keys for reading:
> > >>>>> xxx, yyy
> > >>>>>
> > >>>>> at
> > >>>>>
> > >>>>
> > >>
> >
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> > >>>>>
> > >>>>> Because the current flink metadata column must exist in results
> > >> returned
> > >>>> by
> > >>>>> `listReadableMetadata`.  But when a certain connector adds some
> > >>>> metadatas,
> > >>>>> we can not use it directly unless we modify this connector code and
> > >>>> support
> > >>>>> it. In some situations, It can be intolerable. Can we support
> > 'DYNAMIC
> > >>>>> MetadataColumn'?  Its basic mechanism is not to check a column with
> > >>>>> existing metadatas and users can define it dynamically. If a
> certain
> > >>>>> connector without this metadata, the column value will return null
> > >>>>> otherwise return it's concrete value. It has great benefits in some
> > >>>>> scenarios.
> > >>>>>
> > >>>>> Looking forward to your opinions.
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>> --
> > >>> Best Regards,
> > >>> Ran Tao
> > >>> https://github.com/chucheng92
> > >>
> > >>
> > >
> > > --
> > > Best Regards,
> > > Ran Tao
> > > https://github.com/chucheng92
> >
> >
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>

Reply via email to