Hey Timo and David, I agree that backwards compatibility when evolving a schema is a good use case +1.
Additionally, I think the DEFAULT constraint is a simple but efficient way to ensure data consistency during ingestion. It provides an easy alternative to failing the job or dropping records with missing fields. Kind regards, Gustavo On Thu, 9 Oct 2025 at 20:56, David Anderson <[email protected]> wrote: > Timo, > > > Do you have a concrete use case in mind where this feature could help? > > If I care about backwards compatibility when evolving a schema, giving > DEFAULT values to new columns would be a clean approach. > > David > > On Mon, Oct 6, 2025 at 12:31 AM Timo Walther <[email protected]> wrote: > > > Hi David, > > > > the need for DEFAULT columns popped up a couple of times. And I would > > also support the implementation. But apparently the need was not strong > > enough for concrete resource planning for it. > > > > Do you have a concrete use case in mind where this feature could help? > > > > Regards, > > Timo > > > > On 02.10.25 00:47, David Anderson wrote: > > > I'm wishing we had column DEFAULTs as was discussed in this thread way > > back > > > in 2022. > > > > > > I found FLIP-261 [1], but it doesn't appear to have been voted on or > > > implemented. Is that correct? > > > > > > [1] https://cwiki.apache.org/confluence/x/x4ueDQ > > > > > > Regards, > > > David > > > > > > On Thu, Sep 1, 2022 at 1:32 AM Ran Tao <[email protected]> wrote: > > > > > >> Hi, Jark & Timo. I'm glad to support this feature, and if you guys > > agree, > > >> I'll be ready to create a FLIP, and then you guys and other developers > > can > > >> review and check some specifics. > > >> > > >> Thanks. > > >> > > >> Jark Wu <[email protected]> 于2022年8月30日周二 20:24写道: > > >> > > >>> Thank you Ran for the explanation. > > >>> > > >>> The column DEFAULT is a reasonable feature and can also help in other > > >>> cases. > > >>> I’m fine with adding this feature. > > >>> Do you want to prepare a FLIP for it? > > >>> > > >>> Best, > > >>> Jark > > >>> > > >>>> 2022年8月29日 15:02,Ran Tao <[email protected]> 写道: > > >>>> > > >>>> Hi Jack. Timo summed it up very well. In fact, my problem is that > the > > >>>> current flink table metadata is fixed and cannot be compatible with > > the > > >>>> connector's changes in metadata columns. > > >>>> A metadata column that did not exist in the past, does exist at some > > >>> point > > >>>> in the future, and vice versa. > > >>>> There is forward and backward compatibility here. > > >>>> > > >>>> Jark Wu <[email protected]> 于2022年8月26日周五 16:28写道: > > >>>> > > >>>>> Hi Ran, > > >>>>> > > >>>>> If the metadata is from the message properties, then you can > manually > > >>> cast > > >>>>> it to your preferred types, > > >>>>> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS > > >>>>> TIMESTAMP)`. > > >>>>> > > >>>>> If the metadata is not from the message properties, how does the > > >>> connector > > >>>>> know which field to convert from? > > >>>>> Shouldn’t the connector be modified to support this new metadata > > >> column? > > >>>>> > > >>>>> Best, > > >>>>> Jark > > >>>>> > > >>>>> > > >>>>> > > >>>>>> 2022年8月26日 15:30,Ran Tao <[email protected]> 写道: > > >>>>>> > > >>>>>> Hi, TiMo. I think using one map column in the debezium format you > > >>>>>> illustrated above can't cover the discussed scenario. > > >>>>>> It's not the same thing. > > >>>>>> > > >>>>>> Here is a debezium format example from flink docs: [1] > > >>>>>> > > >>>>>> ``` > > >>>>>> CREATE TABLE KafkaTable ( > > >>>>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > > >>>>> VIRTUAL, > > >>>>>> origin_properties MAP<STRING, STRING> METADATA FROM > > >>>>>> 'value.source.properties' VIRTUAL, > > >>>>>> user_id BIGINT, > > >>>>>> ) WITH ( > > >>>>>> 'connector' = 'kafka', > > >>>>>> 'value.format' = 'debezium-json' > > >>>>>> ... > > >>>>>> ); > > >>>>>> ``` > > >>>>>> > > >>>>>> *the `origin_properties` is a column used for properties. So we > > >> define > > >>> it > > >>>>>> at MAP *(just like you respond). But the other metadata columns > have > > >>>>> their > > >>>>>> own data types. > > >>>>>> e.g. `origin_ts` is TIMESTAMP. We can not flatmap all metadata > > >> columns > > >>>>>> within one MAP<STRING, STRING> column. it's not a good idea. > > >>>>>> > > >>>>>> My suggestion is that if kafka above *add some new metadatas*(just > > >> for > > >>>>>> example, kafka maybe stable, but a certain connector or middleware > > >>> might > > >>>>> be > > >>>>>> developing, so its metadatas could be added or changed) > > >>>>>> e.g. at some time, kafka added a `host_name` metadata (indicate > the > > >>>>> address > > >>>>>> of message broker). > > >>>>>> > > >>>>>> We can define sql like this: > > >>>>>> ``` > > >>>>>> CREATE TABLE KafkaTable ( > > >>>>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > > >>>>> VIRTUAL, > > >>>>>> host_name STRING METADATA VIRTUAL DYNAMIC, > > >>>>>> origin_properties MAP<STRING, STRING> METADATA FROM > > >>>>>> 'value.source.properties' VIRTUAL, > > >>>>>> user_id BIGINT, > > >>>>>> ) WITH ( > > >>>>>> 'connector' = 'kafka', > > >>>>>> 'value.format' = 'debezium-json' > > >>>>>> ... > > >>>>>> ); > > >>>>>> ``` > > >>>>>> Then users can use `host_name` this metadata, because it's a > DYNAMIC > > >>>>>> metacolumn, flink dont't throw exception although `host_name` > > >>>>>> not belongs to kafka before, and the developers don't need to > modify > > >> or > > >>>>>> rebuild flink source code and publish flink to online environment > > (it > > >>>>> comes > > >>>>>> at a high cost). > > >>>>>> > > >>>>>> Considering the return value: > > >>>>>> kafka before (no this metadata): null > > >>>>>> kafka now (added this metadata already): the concrete value > > >>>>>> > > >>>>>> Same user sql works well in the past and now even in the future > > >> rather > > >>>>> than > > >>>>>> check and deny these new metadata columns or modify connector > > >>>>>> implementation frequently to support it. > > >>>>>> And it's an option to configure by using 'DYNAMIC' at the metadata > > >>>>>> column(or other better implementations). > > >>>>>> > > >>>>>> [1] > > >>>>>> > > >>>>> > > >>> > > >> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/ > > >>>>>> > > >>>>>> Timo Walther <[email protected]> 于2022年8月25日周四 21:07写道: > > >>>>>> > > >>>>>>> Hi Ran, > > >>>>>>> > > >>>>>>> what would be the data type of this dynamic metadata column? The > > >>> planner > > >>>>>>> and many parts of the stack will require a data type. > > >>>>>>> > > >>>>>>> Personally, I feel connector developers can already have the same > > >>>>>>> functionality by declaring a metadata column as `MAP<STRING, > > >> STRING>`. > > >>>>>>> This is what we expose already as `debezium.source.properties`. > > >>> Whatever > > >>>>>>> Debezium adds will be available through this property and can be > > >>>>>>> accessed via `SELECT col['my-new-property'] FROM x` including > being > > >>> NULL > > >>>>>>> be default if not present. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>>> > > >>>>>>> On 25.08.22 14:04, Ran Tao wrote: > > >>>>>>>> ``` > > >>>>>>>> create table test_source( > > >>>>>>>> __test_metadata__ varchar METADATA, > > >>>>>>>> f0 varchar, > > >>>>>>>> f1 varchar, > > >>>>>>>> f2 bigint, > > >>>>>>>> ts as CURRENT_TIMESTAMP > > >>>>>>>> ) with( > > >>>>>>>> 'connector'='test', > > >>>>>>>> ... > > >>>>>>>> ) > > >>>>>>>> ``` > > >>>>>>>> > > >>>>>>>> If we not pre define `__test_metadata__` as meta keys by > > >> implementing > > >>>>>>>> listReadableMetadata, run the above sql, it will cause exception > > >> like > > >>>>>>> this: > > >>>>>>>> > > >>>>>>>> org.apache.flink.table.api.ValidationException: Invalid metadata > > >> key > > >>>>>>>> '__test_metadata__' in column '__test_metadata__' of table > > >>>>>>>> 'default_catalog.default_database.test_source'. The > > >>> DynamicTableSource > > >>>>>>>> class > > >>> 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' > > >>>>>>>> supports the following metadata keys for reading: > > >>>>>>>> xxx, yyy > > >>>>>>>> > > >>>>>>>> at > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>> > > >> > > > org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) > > >>>>>>>> > > >>>>>>>> Because the current flink metadata column must exist in results > > >>>>> returned > > >>>>>>> by > > >>>>>>>> `listReadableMetadata`. But when a certain connector adds some > > >>>>>>> metadatas, > > >>>>>>>> we can not use it directly unless we modify this connector code > > and > > >>>>>>> support > > >>>>>>>> it. In some situations, It can be intolerable. Can we support > > >>> 'DYNAMIC > > >>>>>>>> MetadataColumn'? Its basic mechanism is not to check a column > > with > > >>>>>>>> existing metadatas and users can define it dynamically. If a > > >> certain > > >>>>>>>> connector without this metadata, the column value will return > null > > >>>>>>>> otherwise return it's concrete value. It has great benefits in > > some > > >>>>>>>> scenarios. > > >>>>>>>> > > >>>>>>>> Looking forward to your opinions. > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> Best Regards, > > >>>>>> Ran Tao > > >>>>>> https://github.com/chucheng92 > > >>>>> > > >>>>> > > >>>> > > >>>> -- > > >>>> Best Regards, > > >>>> Ran Tao > > >>>> https://github.com/chucheng92 > > >>> > > >>> > > >> > > >> -- > > >> Best Regards, > > >> Ran Tao > > >> https://github.com/chucheng92 > > >> > > > > > > > >
