Hi Rui, I checked the AvroUtils code. There is a static intializer block basically it registers Avro Timestamp Conversion functions for logical type timestamp-millis.
*// Code Snippet below* static { // This works around a bug in the Avro library (AVRO-1891) around SpecificRecord's handling // of DateTime types. SpecificData.get().addLogicalTypeConversion(new TimeConversions. TimestampConversion()); GenericData.get().addLogicalTypeConversion(new TimeConversions. TimestampConversion()); } Because of this when deserializing generic record from kafka using KafkaAvroDeserializer, the long value produced at the producer end gets converted to joda-time during deserialization. Next when we try to convert this genericRecord to Row as part of AvroUtils.toBeamRowStrict function, we again try to convert the value recieved to joda-time. But the exception is thrown as there is type cast to Long. *// Code Snippet Below:* else if (logicalType instanceof LogicalTypes.TimestampMillis) { return convertDateTimeStrict((Long) value, fieldType); *<-- Class cast exception is thrown here, as we are typecasting from JodaTime to Long* } private static Object convertDateTimeStrict (Long value, Schema.FieldType fieldType) { checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "dateTime "); return new Instant(value); <-- *Creates a JodaTime Instance here* } *Thanks & Regards,* *Vishwas * On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote: > I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to > Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from > GenericRecord, and tries to cast objects based on their types (and > cast(object) to long for "timestamp-millis"). see [1]. > > So in order to use `AvroUtils.toBeamRowStrict`, the generated > GenericRecord should have long for "timestamp-millis". > > The schema you pasted looks right. Not sure why generated class is Joda > time (is it controlled by some flags?). But at least you could write a > small function to do schema conversion for your need. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672 > > > Rui > > > On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote: > >> Hi Rui, >> >> I agree that by converting it to long, there will be no error. >> But the KafkaIO is giving a GenericRecord with attribute of type >> JodaTime. Now I convert it to long. Then in the AvroUtils.toBeamRowStrict >> again converts it to JodaTime. >> >> I used the avro tools 1.8.2 jar, for the below schema and I see that the >> generated class has a JodaTime attribute. >> >> { >> "name": "timeOfRelease", >> "type": >> { >> "type": "long", >> "logicalType": "timestamp-millis", >> "connect.version": 1, >> "connect.name": >> "org.apache.kafka.connect.data.Timestamp" >> } >> } >> >> *Attribute type in generated class:* >> private org.joda.time.DateTime timeOfRelease; >> >> >> So not sure why this type casting is required. >> >> >> *Thanks & Regards,* >> >> *Vishwas * >> >> >> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote: >> >>> Read from the code and seems like as the logical type "timestamp-millis" >>> means, it's expecting millis in Long as values under this logical type. >>> >>> So if you can convert joda-time to millis before calling >>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception >>> will gone. >>> >>> -Rui >>> >>> >>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> +dev <d...@beam.apache.org> >>>> >>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Below is my pipeline: >>>>> >>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql >>>>> ---------------> KafkaSink(KafkaIO.write) >>>>> >>>>> >>>>> The avro schema of the topic has a field of logical type >>>>> timestamp-millis. KafkaIO.read transform is creating a >>>>> KafkaRecord<String,GenericRecord>, where this field is being converted to >>>>> joda-time. >>>>> >>>>> In my Pardo transform, I am trying to use the AvroUtils class methods >>>>> to convert the generic record to Beam Row and getting below class cast >>>>> exception for the joda-time attribute. >>>>> >>>>> AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) >>>>> >>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot >>>>> be cast to java.lang.Long >>>>> at >>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) >>>>> at >>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) >>>>> >>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 >>>>> for this >>>>> >>>>> >>>>> >>>>> *Thanks & Regards,* >>>>> >>>>> *Vishwas * >>>>> >>>>>