Hey Timo and David,

I agree that backwards compatibility when evolving a schema is a good use
case +1.

Additionally, I think the DEFAULT constraint is a simple but efficient way
to ensure data consistency during ingestion. It provides an easy
alternative to failing the job or dropping records with missing fields.

Kind regards,
Gustavo

On Thu, 9 Oct 2025 at 20:56, David Anderson <[email protected]> wrote:

> Timo,
>
> > Do you have a concrete use case in mind where this feature could help?
>
> If I care about backwards compatibility when evolving a schema, giving
> DEFAULT values to new columns would be a clean approach.
>
> David
>
> On Mon, Oct 6, 2025 at 12:31 AM Timo Walther <[email protected]> wrote:
>
> > Hi David,
> >
> > the need for DEFAULT columns popped up a couple of times. And I would
> > also support the implementation. But apparently the need was not strong
> > enough for concrete resource planning for it.
> >
> > Do you have a concrete use case in mind where this feature could help?
> >
> > Regards,
> > Timo
> >
> > On 02.10.25 00:47, David Anderson wrote:
> > > I'm wishing we had column DEFAULTs as was discussed in this thread way
> > back
> > > in 2022.
> > >
> > > I found FLIP-261 [1], but it doesn't appear to have been voted on or
> > > implemented. Is that correct?
> > >
> > > [1] https://cwiki.apache.org/confluence/x/x4ueDQ
> > >
> > > Regards,
> > > David
> > >
> > > On Thu, Sep 1, 2022 at 1:32 AM Ran Tao <[email protected]> wrote:
> > >
> > >> Hi, Jark & Timo. I'm glad to support this feature, and if you guys
> > agree,
> > >> I'll be ready to create a FLIP, and then you guys and other developers
> > can
> > >> review and check some specifics.
> > >>
> > >> Thanks.
> > >>
> > >> Jark Wu <[email protected]> 于2022年8月30日周二 20:24写道:
> > >>
> > >>> Thank you Ran for the explanation.
> > >>>
> > >>> The column DEFAULT is a reasonable feature and can also help in other
> > >>> cases.
> > >>> I’m fine with adding this feature.
> > >>> Do you want to prepare a FLIP for it?
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>>> 2022年8月29日 15:02,Ran Tao <[email protected]> 写道:
> > >>>>
> > >>>> Hi Jack. Timo summed it up very well. In fact, my problem is that
> the
> > >>>> current flink table metadata is fixed and cannot be compatible with
> > the
> > >>>> connector's changes in metadata columns.
> > >>>> A metadata column that did not exist in the past, does exist at some
> > >>> point
> > >>>> in the future, and vice versa.
> > >>>> There is forward and backward compatibility here.
> > >>>>
> > >>>> Jark Wu <[email protected]> 于2022年8月26日周五 16:28写道:
> > >>>>
> > >>>>> Hi Ran,
> > >>>>>
> > >>>>> If the metadata is from the message properties, then you can
> manually
> > >>> cast
> > >>>>> it to your preferred types,
> > >>>>> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS
> > >>>>> TIMESTAMP)`.
> > >>>>>
> > >>>>> If the metadata is not from the message properties, how does the
> > >>> connector
> > >>>>> know which field to convert from?
> > >>>>> Shouldn’t the connector be modified to support this new metadata
> > >> column?
> > >>>>>
> > >>>>> Best,
> > >>>>> Jark
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>> 2022年8月26日 15:30,Ran Tao <[email protected]> 写道:
> > >>>>>>
> > >>>>>> Hi, TiMo. I think using one map column in the debezium format you
> > >>>>>> illustrated above can't cover the discussed scenario.
> > >>>>>> It's not the same thing.
> > >>>>>>
> > >>>>>> Here is a debezium format example from flink docs: [1]
> > >>>>>>
> > >>>>>> ```
> > >>>>>> CREATE TABLE KafkaTable (
> > >>>>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> > >>>>> VIRTUAL,
> > >>>>>> origin_properties MAP<STRING, STRING> METADATA FROM
> > >>>>>> 'value.source.properties' VIRTUAL,
> > >>>>>> user_id BIGINT,
> > >>>>>> ) WITH (
> > >>>>>> 'connector' = 'kafka',
> > >>>>>> 'value.format' = 'debezium-json'
> > >>>>>> ...
> > >>>>>> );
> > >>>>>> ```
> > >>>>>>
> > >>>>>> *the `origin_properties` is a column used for properties. So we
> > >> define
> > >>> it
> > >>>>>> at MAP *(just like you respond). But the other metadata columns
> have
> > >>>>> their
> > >>>>>> own data types.
> > >>>>>> e.g. `origin_ts` is TIMESTAMP.  We can not flatmap all metadata
> > >> columns
> > >>>>>> within one MAP<STRING, STRING> column. it's not a good idea.
> > >>>>>>
> > >>>>>> My suggestion is that if kafka above *add some new metadatas*(just
> > >> for
> > >>>>>> example, kafka maybe stable, but a certain connector or middleware
> > >>> might
> > >>>>> be
> > >>>>>> developing, so its metadatas could be added or changed)
> > >>>>>> e.g. at some time, kafka added a `host_name` metadata (indicate
> the
> > >>>>> address
> > >>>>>> of message broker).
> > >>>>>>
> > >>>>>> We can define sql like this:
> > >>>>>> ```
> > >>>>>> CREATE TABLE KafkaTable (
> > >>>>>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp'
> > >>>>> VIRTUAL,
> > >>>>>> host_name STRING METADATA VIRTUAL DYNAMIC,
> > >>>>>> origin_properties MAP<STRING, STRING> METADATA FROM
> > >>>>>> 'value.source.properties' VIRTUAL,
> > >>>>>> user_id BIGINT,
> > >>>>>> ) WITH (
> > >>>>>> 'connector' = 'kafka',
> > >>>>>> 'value.format' = 'debezium-json'
> > >>>>>> ...
> > >>>>>> );
> > >>>>>> ```
> > >>>>>> Then users can use `host_name` this metadata, because it's a
> DYNAMIC
> > >>>>>> metacolumn, flink dont't throw exception although `host_name`
> > >>>>>> not belongs to kafka before, and the developers don't need to
> modify
> > >> or
> > >>>>>> rebuild flink source code and publish flink to online environment
> > (it
> > >>>>> comes
> > >>>>>> at a high cost).
> > >>>>>>
> > >>>>>> Considering the return value:
> > >>>>>> kafka before (no this metadata): null
> > >>>>>> kafka now (added this metadata already): the concrete value
> > >>>>>>
> > >>>>>> Same user sql works well in the past and now even in the future
> > >> rather
> > >>>>> than
> > >>>>>> check and deny these new metadata columns or modify connector
> > >>>>>> implementation frequently to support it.
> > >>>>>> And it's an option to configure by using 'DYNAMIC' at the metadata
> > >>>>>> column(or other better implementations).
> > >>>>>>
> > >>>>>> [1]
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/
> > >>>>>>
> > >>>>>> Timo Walther <[email protected]> 于2022年8月25日周四 21:07写道:
> > >>>>>>
> > >>>>>>> Hi Ran,
> > >>>>>>>
> > >>>>>>> what would be the data type of this dynamic metadata column? The
> > >>> planner
> > >>>>>>> and many parts of the stack will require a data type.
> > >>>>>>>
> > >>>>>>> Personally, I feel connector developers can already have the same
> > >>>>>>> functionality by declaring a metadata column as `MAP<STRING,
> > >> STRING>`.
> > >>>>>>> This is what we expose already as `debezium.source.properties`.
> > >>> Whatever
> > >>>>>>> Debezium adds will be available through this property and can be
> > >>>>>>> accessed via `SELECT col['my-new-property'] FROM x` including
> being
> > >>> NULL
> > >>>>>>> be default if not present.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 25.08.22 14:04, Ran Tao wrote:
> > >>>>>>>> ```
> > >>>>>>>> create table test_source(
> > >>>>>>>> __test_metadata__ varchar METADATA,
> > >>>>>>>> f0 varchar,
> > >>>>>>>> f1 varchar,
> > >>>>>>>> f2 bigint,
> > >>>>>>>> ts as CURRENT_TIMESTAMP
> > >>>>>>>> ) with(
> > >>>>>>>> 'connector'='test',
> > >>>>>>>>   ...
> > >>>>>>>> )
> > >>>>>>>> ```
> > >>>>>>>>
> > >>>>>>>> If we not pre define `__test_metadata__` as meta keys by
> > >> implementing
> > >>>>>>>> listReadableMetadata, run the above sql, it will cause exception
> > >> like
> > >>>>>>> this:
> > >>>>>>>>
> > >>>>>>>> org.apache.flink.table.api.ValidationException: Invalid metadata
> > >> key
> > >>>>>>>> '__test_metadata__' in column '__test_metadata__' of table
> > >>>>>>>> 'default_catalog.default_database.test_source'. The
> > >>> DynamicTableSource
> > >>>>>>>> class
> > >>> 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
> > >>>>>>>> supports the following metadata keys for reading:
> > >>>>>>>> xxx, yyy
> > >>>>>>>>
> > >>>>>>>> at
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> >
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)
> > >>>>>>>>
> > >>>>>>>> Because the current flink metadata column must exist in results
> > >>>>> returned
> > >>>>>>> by
> > >>>>>>>> `listReadableMetadata`.  But when a certain connector adds some
> > >>>>>>> metadatas,
> > >>>>>>>> we can not use it directly unless we modify this connector code
> > and
> > >>>>>>> support
> > >>>>>>>> it. In some situations, It can be intolerable. Can we support
> > >>> 'DYNAMIC
> > >>>>>>>> MetadataColumn'?  Its basic mechanism is not to check a column
> > with
> > >>>>>>>> existing metadatas and users can define it dynamically. If a
> > >> certain
> > >>>>>>>> connector without this metadata, the column value will return
> null
> > >>>>>>>> otherwise return it's concrete value. It has great benefits in
> > some
> > >>>>>>>> scenarios.
> > >>>>>>>>
> > >>>>>>>> Looking forward to your opinions.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Best Regards,
> > >>>>>> Ran Tao
> > >>>>>> https://github.com/chucheng92
> > >>>>>
> > >>>>>
> > >>>>
> > >>>> --
> > >>>> Best Regards,
> > >>>> Ran Tao
> > >>>> https://github.com/chucheng92
> > >>>
> > >>>
> > >>
> > >> --
> > >> Best Regards,
> > >> Ran Tao
> > >> https://github.com/chucheng92
> > >>
> > >
> >
> >
>

Reply via email to