Hi,  Harshvardhan

The format debezium-avro-confluent  doesn’t support read metadata yet[1], the 
supported formats including debezium-json, canal-json and maxwell-json, you can 
try the supported formats.

Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20454 
<https://issues.apache.org/jira/browse/FLINK-20454>



> 在 2021年9月24日,14:00,Harshvardhan Shinde <harshvardhan.shi...@oyorooms.com> 写道:
> 
> Hi,
> Here's the complete error log:
>  
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Invalid metadata key 
> 'value.ingestion-timestamp' in column 'origin_ts' of table 
> 'flink_hive.harsh_test.testflink'. The DynamicTableSource class 
> 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' 
> supports the following metadata keys for reading:
> topic
> partition
> headers
> leader-epoch
> offset
> timestamp
> timestamp-type
> 
> I'll need some more time to test it with debezium-json format.
> 
> On Fri, Sep 24, 2021 at 12:52 AM Roman Khachatryan <ro...@apache.org 
> <mailto:ro...@apache.org>> wrote:
> Hi,
> could you please share the full error message?
> I think it should list the supported metadata columns.
> 
> Do you see the same error with 'debezium-json' format instead of
> 'debezium-avro-confluent' ?
> 
> Regards,
> Roman
> 
> 
> On Wed, Sep 22, 2021 at 5:12 PM Harshvardhan Shinde
> <harshvardhan.shi...@oyorooms.com <mailto:harshvardhan.shi...@oyorooms.com>> 
> wrote:
> >
> > Hi,
> > I'm trying to access the metadata columns from the debezium source 
> > connector as documented here.
> > However I'm getting the following error when I try to select the rows from 
> > the kafka table:
> >
> > flink.table.api.ValidationException: Invalid metadata key 
> > 'value.ingestion-timestamp' in column 'origin_ts'
> >
> > Getting the same issue for all the virtual columns. Please let me know what 
> > I'm doing wrong.
> >
> > Here's my table creation query:
> >
> > CREATE TABLE testFlink (
> >   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
> >   event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
> >   origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
> >   origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
> >   origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
> >   origin_properties MAP<STRING, STRING> METADATA FROM 
> > 'value.source.properties' VIRTUAL,
> >   id BIGINT,
> >   number BIGINT,
> >   created_at BIGINT,
> >   updated_at BIGINT
> > ) WITH (
> >   'connector' = 'kafka',
> >   'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
> >   'properties.bootstrap.servers' = '<BROKER_URL>:9092',
> >   'properties.group.id <http://properties.group.id/>' = 'testGroup',
> >   'scan.startup.mode' = 'earliest-offset',
> >   'value.format' = 'debezium-avro-confluent',
> >   'value.debezium-avro-confluent.schema-registry.url' = 
> > '<SCHEMA_REGISRTY>:8081'
> > );
> >
> > Thanks.
> 
> 
> -- 
> Thanks and Regards,
> Harshvardhan
> Data Platform

Reply via email to