Hi, Harshvardhan The format debezium-avro-confluent doesn’t support read metadata yet[1], the supported formats including debezium-json, canal-json and maxwell-json, you can try the supported formats.
Best, Leonard [1] https://issues.apache.org/jira/browse/FLINK-20454 <https://issues.apache.org/jira/browse/FLINK-20454> > 在 2021年9月24日,14:00,Harshvardhan Shinde <harshvardhan.shi...@oyorooms.com> 写道: > > Hi, > Here's the complete error log: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Invalid metadata key > 'value.ingestion-timestamp' in column 'origin_ts' of table > 'flink_hive.harsh_test.testflink'. The DynamicTableSource class > 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' > supports the following metadata keys for reading: > topic > partition > headers > leader-epoch > offset > timestamp > timestamp-type > > I'll need some more time to test it with debezium-json format. > > On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org > <mailto:ro...@apache.org>> wrote: > Hi, > could you please share the full error message? > I think it should list the supported metadata columns. > > Do you see the same error with 'debezium-json' format instead of > 'debezium-avro-confluent' ? > > Regards, > Roman > > > On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde > <harshvardhan.shi...@oyorooms.com <mailto:harshvardhan.shi...@oyorooms.com>> > wrote: > > > > Hi, > > I'm trying to access the metadata columns from the debezium source > > connector as documented here. > > However I'm getting the following error when I try to select the rows from > > the kafka table: > > > > flink.table.api.ValidationException: Invalid metadata key > > 'value.ingestion-timestamp' in column 'origin_ts' > > > > Getting the same issue for all the virtual columns. Please let me know what > > I'm doing wrong. > > > > Here's my table creation query: > > > > CREATE TABLE testFlink ( > > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, > > event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, > > origin_database STRING METADATA FROM 'value.source.database' VIRTUAL, > > origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL, > > origin_table STRING METADATA FROM 'value.source.table' VIRTUAL, > > origin_properties MAP<STRING, STRING> METADATA FROM > > 'value.source.properties' VIRTUAL, > > id BIGINT, > > number BIGINT, > > created_at BIGINT, > > updated_at BIGINT > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test', > > 'properties.bootstrap.servers' = '<BROKER_URL>:9092', > > 'properties.group.id <http://properties.group.id/>' = 'testGroup', > > 'scan.startup.mode' = 'earliest-offset', > > 'value.format' = 'debezium-avro-confluent', > > 'value.debezium-avro-confluent.schema-registry.url' = > > '<SCHEMA_REGISRTY>:8081' > > ); > > > > Thanks. > > > -- > Thanks and Regards, > Harshvardhan > Data Platform