Hi All!

We are trying to work with avro serialized data from Kafka using the Table
API and use TIMESTAMP column type.

According to the docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:


  {"name": "timestamp_field", "type": {"type":"long", "logicalType":
"timestamp-millis"}}

When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime
cannot be cast to class java.lang.Long (java.time.LocalDateTime and
java.lang.Long are in module java.base of loader 'bootstrap')
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)

It seems like the avro format (serializer) is not aware of the logical
type conversion that is needed to convert back to the physical type
long.

I looked at the AvroTypesITCase which uses all kinds of logical types
but I could only find logic that maps between Avro Pojos and tables
and none that actually uses the serializaiton/deserialization logic
with the format.

Could someone please help me with this? Maybe what I am trying to do
is not possible, or I just missed a crucial step.

Thank you!
Gyula

Reply via email to