Hi Gyula,

it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and
see if this solves your particular problem.
The code path in GenericDatumWriter is taking the conversion path, so it
might just work. Of course that depends on the schema being correctly
translated to a specific record that uses the new TimeConversions [1].

[1]
https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java

On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi!
>
> @Arvid: We are using Avro 1.8 I believe but this problem seems to come
> from the flink side as Dawid mentioned.
>
> @Dawid:
> Sounds like a reasonable explanation, here are the actual queries to
> reproduce within the SQL client/table api:
>
> CREATE TABLE source_table (
>       int_field INT,
>       timestamp_field TIMESTAMP(3)
> ) WITH (
>       'connector.type'                           = 'kafka',
>       'connector.version'                        = 'universal',
>       'connector.topic'                          = 'avro_tset',
>       'connector.properties.bootstrap.servers'   = '<...>',
>       'format.type'                              = 'avro',
>     'format.avro-schema' =
>     '{
>       "type": "record",
>       "name": "test",
>       "fields" : [
>         {"name": "int_field", "type": "int"},
>         {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
> "timestamp-millis"}}
>       ]
>     }'
> )
>
> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
> And the error:
>
> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
> cast to java.lang.Long
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
>       at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>       at 
> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
>
> I will open a Jira ticket as well with these details.
>
> Thank you!
>
> Gyula
>
>
> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Gyula,
>>
>> I have not verified it locally yet, but I think you are hitting yet
>> another problem of the unfinished migration from old TypeInformation based
>> type system to the new type system based on DataTypes. As far as I
>> understand the problem the information about the bridging class
>> (java.sql.Timestamp in this case) is lost in the stack. Because this
>> information is lost/not respected the planner produces LocalDateTime
>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
>> reduce the number of problems.
>>
>> It's definitely worth reporting a bug.
>>
>> BTW could you share how you create the Kafka Table sink to have the full
>> picture?
>>
>> Best,
>>
>> Dawid
>> On 29/04/2020 15:42, Gyula Fóra wrote:
>>
>> Hi All!
>>
>> We are trying to work with avro serialized data from Kafka using the
>> Table API and use TIMESTAMP column type.
>>
>> According to the docs
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
>> we can use long type with logicalType: timestamp-millis.
>> So we use the following avro field schema in the descriptor:
>>
>>
>>   {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>> "timestamp-millis"}}
>>
>> When trying to insert into the table we get the following error:
>>
>> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime 
>> cannot be cast to class java.lang.Long (java.time.LocalDateTime and 
>> java.lang.Long are in module java.base of loader 'bootstrap')     at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>        at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>
>> It seems like the avro format (serializer) is not aware of the logical type 
>> conversion that is needed to convert back to the physical type long.
>>
>> I looked at the AvroTypesITCase which uses all kinds of logical types but I 
>> could only find logic that maps between Avro Pojos and tables and none that 
>> actually uses the serializaiton/deserialization logic with the format.
>>
>> Could someone please help me with this? Maybe what I am trying to do is not 
>> possible, or I just missed a crucial step.
>>
>> Thank you!
>> Gyula
>>
>>
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to