Hi,
I'm trying to access the metadata columns from the debezium source
connector as documented here
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/#available-metadata>
.
However I'm getting the following error when I try to select the rows from
the kafka table:

flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts'

Getting the same issue for all the *virtual* columns. Please let me know
what I'm doing wrong.

Here's my table creation query:

CREATE TABLE testFlink (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM
'value.source.properties' VIRTUAL,
  id BIGINT,
  number BIGINT,
  created_at BIGINT,
  updated_at BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
  'properties.bootstrap.servers' = '<BROKER_URL>:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-avro-confluent',
  'value.debezium-avro-confluent.schema-registry.url' =
'<SCHEMA_REGISRTY>:8081'
);

Thanks.

Reply via email to