Hi Rui, I agree that by converting it to long, there will be no error. But the KafkaIO is giving a GenericRecord with attribute of type JodaTime. Now I convert it to long. Then in the AvroUtils.toBeamRowStrict again converts it to JodaTime.
I used the avro tools 1.8.2 jar, for the below schema and I see that the generated class has a JodaTime attribute. { "name": "timeOfRelease", "type": { "type": "long", "logicalType": "timestamp-millis", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp" } } *Attribute type in generated class:* private org.joda.time.DateTime timeOfRelease; So not sure why this type casting is required. *Thanks & Regards,* *Vishwas * On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote: > Read from the code and seems like as the logical type "timestamp-millis" > means, it's expecting millis in Long as values under this logical type. > > So if you can convert joda-time to millis before calling > "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception > will gone. > > -Rui > > > On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote: > >> +dev <d...@beam.apache.org> >> >> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> wrote: >> >>> Hi, >>> >>> Below is my pipeline: >>> >>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql >>> ---------------> KafkaSink(KafkaIO.write) >>> >>> >>> The avro schema of the topic has a field of logical type >>> timestamp-millis. KafkaIO.read transform is creating a >>> KafkaRecord<String,GenericRecord>, where this field is being converted to >>> joda-time. >>> >>> In my Pardo transform, I am trying to use the AvroUtils class methods to >>> convert the generic record to Beam Row and getting below class cast >>> exception for the joda-time attribute. >>> >>> AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) >>> >>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot >>> be cast to java.lang.Long >>> at >>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) >>> at >>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) >>> >>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 >>> for this >>> >>> >>> >>> *Thanks & Regards,* >>> >>> *Vishwas * >>> >>>