Hi all,
I'm trying to ingest change capture data data from Kafka which contains
some timestamps.
I'm using Flink SQL, and I'm running into issues, specifically with the
created_at field.
//In postgres, it is of type 'timestamptz'.

My table definition is this:
CREATE TABLE contacts (
contact_id STRING,
first_name STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'film.public.contacts',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://redpanda:8081',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'redpanda:29092',
'properties.group.id' = 'analytics'
);

And the data looks something like this:
....
"after": {
"film.public.contacts.Value": {
"contact_id": "51d43c3a-4c82-4418-a779-4cb0a1864fd0",
"created_at": {
"string": "2023-03-06T11:00:17.447018Z"
},
"first_name": "hank"
}
},
"before": {
"film.public.contacts.Value": {
"contact_id": "51d43c3a-4c82-4418-a779-4cb0a1864fd0",
"created_at": {
"string": "2023-03-06T11:00:17.447018Z"
},
"first_name": "bart"
}
},
"op": "u",
"source": {
"connector": "postgresql"
}
....
It looks like the data is in microseconds, which would be timestamptz(6),
which I've heard isn't supported (issue:
https://issues.apache.org/jira/browse/FLINK-23589)
For my use case the microsecond precision isn't a big deal, I'd just like
to be able to parse it at all.

Right now I'm getting this exception: 'Caused by:
org.apache.avro.AvroTypeException: Found string, expecting union'

Any known workarounds? Can I parse it 'manually' using a UDF? If I omit the
created_at field in my query it works fine, but I do need them.

regards, Frank

Reply via email to