Hi Rui,

I agree that by converting it to long, there will be no error.
But the KafkaIO is giving a GenericRecord with attribute of type JodaTime.
Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict again
converts it to JodaTime.

I used the avro tools 1.8.2 jar, for the below schema and I see that the
generated class has a JodaTime attribute.

{
            "name": "timeOfRelease",
            "type":
                {
                    "type": "long",
                    "logicalType": "timestamp-millis",
                    "connect.version": 1,
                    "connect.name":
"org.apache.kafka.connect.data.Timestamp"
                }
 }

*Attribute type in generated class:*
private org.joda.time.DateTime timeOfRelease;


So not sure why this type casting is required.


*Thanks & Regards,*

*Vishwas *


On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote:

> Read from the code and seems like as the logical type "timestamp-millis"
> means, it's expecting millis in Long as values under this logical type.
>
> So if you can convert joda-time to millis before calling
> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
> will gone.
>
> -Rui
>
>
> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> +dev <d...@beam.apache.org>
>>
>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Below is my pipeline:
>>>
>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql
>>> ---------------> KafkaSink(KafkaIO.write)
>>>
>>>
>>> The avro schema of the topic has a field of logical type
>>> timestamp-millis.  KafkaIO.read transform is creating a
>>> KafkaRecord<String,GenericRecord>, where this field is being converted to
>>> joda-time.
>>>
>>> In my Pardo transform, I am trying to use the AvroUtils class methods to
>>> convert the generic record to Beam Row and getting below class cast
>>> exception for the joda-time attribute.
>>>
>>>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>>
>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot
>>> be cast to java.lang.Long
>>>     at
>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>     at
>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>
>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>> for this
>>>
>>>
>>>
>>> *Thanks & Regards,*
>>>
>>> *Vishwas *
>>>
>>>

Reply via email to