Hi, Dale.
I think there are two choices to try.
1. As the reply in #22427[1], use the SQL function `COALESCE`.
2. Modify the code in Avro format by yourself.
There is some work to do for the choice 2. First, you need to pass the
default value in Schema, which does not contain the default value now. Then
you need to modify the AvroRowDataDeserializationSchema to return the
default value when the field is null.
Best,
Hang
[1] https://issues.apache.org/jira/browse/FLINK-22427
Dale Lane 于2023年11月14日周二 01:33写道:
> I have a Kafka topic with events produced using an Avro schema like this:
>
>
>
> {
>
> "namespace": "demo.avro",
>
> "type": "record",
>
> "name": "MySimplifiedRecreate",
>
> "fields": [
>
> {
>
> "name": "favouritePhrase",
>
> "type": "string",
>
> "default": "Hello World"
>
> },
>
> {
>
> "name": "favouriteNumber",
>
> "type": "int",
>
> "default": 42
>
> },
>
> {
>
> "name": "isItTrue",
>
> "type": "boolean"
>
> }
>
> ]
>
> }
>
>
>
> I want to use the default values in the same way that I do in other Kafka
> consumers. (Specifically, that when a message on the topic is missing a
> value for one of these properties, the default value is used instead).
>
>
>
> e.g.
>
>
>
> CREATE TABLE `simplified-recreate`
>
> (
>
> `favouritePhrase` STRING DEFAULT 'Hello World',
>
> `favouriteNumber` INT DEFAULT 42,
>
> `isItTrue` BOOLEAN NOT NULL
>
> )
>
> WITH (
>
> 'connector' = 'kafka',
>
> 'format' = 'avro',
>
> ...
>
>
>
> As far as I can see, DEFAULT isn’t available in Flink SQL. (Although I can
> see it was considered before in a different context -
> https://issues.apache.org/jira/browse/FLINK-22427 )
>
>
>
> Is there another way to *process events with missing properties where the
> schema identifies the correct default*?
>
>
>
> Kind regards
>
>
>
> Dale
>
>
>
>
>
>
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>