Hi Rui,

I checked the AvroUtils code. There is a static intializer block basically
it registers Avro Timestamp Conversion functions for logical type
timestamp-millis.

*// Code Snippet below*
static {
// This works around a bug in the Avro library (AVRO-1891) around
SpecificRecord's handling
// of DateTime types.
   SpecificData.get().addLogicalTypeConversion(new TimeConversions.
TimestampConversion());
   GenericData.get().addLogicalTypeConversion(new TimeConversions.
TimestampConversion());
}

Because of this when deserializing generic record from kafka using
KafkaAvroDeserializer, the long value produced at the producer end gets
converted to joda-time during deserialization.

Next when we try to convert this genericRecord to Row as part of
AvroUtils.toBeamRowStrict function, we again try to convert the value
recieved to joda-time.
But the exception is thrown as there is type cast to Long.

*// Code Snippet Below:*
else if (logicalType instanceof LogicalTypes.TimestampMillis) {
     return convertDateTimeStrict((Long) value, fieldType);     *<--  Class
cast exception is thrown here, as we are typecasting from JodaTime to Long*
}

private static Object convertDateTimeStrict (Long value, Schema.FieldType
fieldType) {
         checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "dateTime
");
         return new Instant(value);      <--  *Creates a JodaTime Instance
here*
}


*Thanks & Regards,*

*Vishwas *



On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote:

> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
> GenericRecord, and tries to cast objects based on their types (and
> cast(object) to long for "timestamp-millis"). see [1].
>
> So in order to use `AvroUtils.toBeamRowStrict`, the generated
> GenericRecord should have long for "timestamp-millis".
>
> The schema you pasted looks right. Not sure why generated class is Joda
> time (is it controlled by some flags?). But at least you could write a
> small function to do schema conversion for your need.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672
>
>
> Rui
>
>
> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>
>> Hi Rui,
>>
>> I agree that by converting it to long, there will be no error.
>> But the KafkaIO is giving a GenericRecord with attribute of type
>> JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict
>> again converts it to JodaTime.
>>
>> I used the avro tools 1.8.2 jar, for the below schema and I see that the
>> generated class has a JodaTime attribute.
>>
>> {
>>             "name": "timeOfRelease",
>>             "type":
>>                 {
>>                     "type": "long",
>>                     "logicalType": "timestamp-millis",
>>                     "connect.version": 1,
>>                     "connect.name":
>> "org.apache.kafka.connect.data.Timestamp"
>>                 }
>>  }
>>
>> *Attribute type in generated class:*
>> private org.joda.time.DateTime timeOfRelease;
>>
>>
>> So not sure why this type casting is required.
>>
>>
>> *Thanks & Regards,*
>>
>> *Vishwas *
>>
>>
>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote:
>>
>>> Read from the code and seems like as the logical type "timestamp-millis"
>>> means, it's expecting millis in Long as values under this logical type.
>>>
>>> So if you can convert joda-time to millis before calling
>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
>>> will gone.
>>>
>>> -Rui
>>>
>>>
>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> +dev <d...@beam.apache.org>
>>>>
>>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Below is my pipeline:
>>>>>
>>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql
>>>>> ---------------> KafkaSink(KafkaIO.write)
>>>>>
>>>>>
>>>>> The avro schema of the topic has a field of logical type
>>>>> timestamp-millis.  KafkaIO.read transform is creating a
>>>>> KafkaRecord<String,GenericRecord>, where this field is being converted to
>>>>> joda-time.
>>>>>
>>>>> In my Pardo transform, I am trying to use the AvroUtils class methods
>>>>> to convert the generic record to Beam Row and getting below class cast
>>>>> exception for the joda-time attribute.
>>>>>
>>>>>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>>>>
>>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot
>>>>> be cast to java.lang.Long
>>>>>     at
>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>>>     at
>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>>>
>>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>>>> for this
>>>>>
>>>>>
>>>>>
>>>>> *Thanks & Regards,*
>>>>>
>>>>> *Vishwas *
>>>>>
>>>>>

Reply via email to