[ https://issues.apache.org/jira/browse/BEAM-7073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ryan Skraba reassigned BEAM-7073: --------------------------------- Assignee: Ryan Skraba > AvroUtils converting generic record to Beam Row causes class cast exception > --------------------------------------------------------------------------- > > Key: BEAM-7073 > URL: https://issues.apache.org/jira/browse/BEAM-7073 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.11.0 > Environment: Direct Runner > Reporter: Vishwas > Assignee: Ryan Skraba > Priority: Major > > Below is my pipeline: > KafkaSource (KafkaIo.read) -----------> Pardo -------> BeamSql------> > KafkaSink (KafkaIO.write) > Kafka Source IO reads from Kafka topic avro records and deserializes it to > generic record using below > KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, > GenericRecord>read() > .withBootstrapServers(bootstrapServerUrl) > .withTopic(topicName) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializerAndCoder(GenericAvroDeserializer.class, > > AvroCoder.of(GenericRecord.class, avroSchema)) > > .updateConsumerProperties(ImmutableMap.of("schema.registry.url", > > schemaRegistryUrl)); > Avro schema of the topic has a logicaltype (timestamp-millis). This is > deserialized to > joda-time. > { > "name": "timeOfRelease", > "type": [ > "null", > { > "type": "long", > "logicalType": "timestamp-millis", > "connect.version": 1, > "connect.name": "org.apache.kafka.connect.data.Timestamp" > } > ], > "default": null, > } > Now in my Pardo transform, I am trying to use the AvroUtils class methods to > convert the generic record to Beam Row and getting below class cast exception > AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) > Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be > cast to java.lang.Long > at > org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) > at > org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) > > This looks like a bug as joda time type created as part of deserialization is > being type casted to Long in below code. > else if (logicalType instanceof LogicalTypes.TimestampMillis) { > return convertDateTimeStrict((Long) value, fieldType); > } > PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned > avro schema and I see that the attribute with timestamp-millis logical type > is being converted to joda-time. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)