Hi! @Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.
@Dawid: Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api: CREATE TABLE source_table ( int_field INT, timestamp_field TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'avro_tset', 'connector.properties.bootstrap.servers' = '<...>', 'format.type' = 'avro', 'format.avro-schema' = '{ "type": "record", "name": "test", "fields" : [ {"name": "int_field", "type": "int"}, {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}} ] }' ) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11'); And the error: Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143) I will open a Jira ticket as well with these details. Thank you! Gyula On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Gyula, > > I have not verified it locally yet, but I think you are hitting yet > another problem of the unfinished migration from old TypeInformation based > type system to the new type system based on DataTypes. As far as I > understand the problem the information about the bridging class > (java.sql.Timestamp in this case) is lost in the stack. Because this > information is lost/not respected the planner produces LocalDateTime > instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema > expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails > for LocalDateTime. I really hope the effort of FLIP-95 will significantly > reduce the number of problems. > > It's definitely worth reporting a bug. > > BTW could you share how you create the Kafka Table sink to have the full > picture? > > Best, > > Dawid > On 29/04/2020 15:42, Gyula Fóra wrote: > > Hi All! > > We are trying to work with avro serialized data from Kafka using the Table > API and use TIMESTAMP column type. > > According to the docs > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>, > we can use long type with logicalType: timestamp-millis. > So we use the following avro field schema in the descriptor: > > > {"name": "timestamp_field", "type": {"type":"long", "logicalType": > "timestamp-millis"}} > > When trying to insert into the table we get the following error: > > Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot > be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long > are in module java.base of loader 'bootstrap') at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) > > It seems like the avro format (serializer) is not aware of the logical type > conversion that is needed to convert back to the physical type long. > > I looked at the AvroTypesITCase which uses all kinds of logical types but I > could only find logic that maps between Avro Pojos and tables and none that > actually uses the serializaiton/deserialization logic with the format. > > Could someone please help me with this? Maybe what I am trying to do is not > possible, or I just missed a crucial step. > > Thank you! > Gyula > > > >