Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from
the flink side as Dawid mentioned.

@Dawid:
Sounds like a reasonable explanation, here are the actual queries to
reproduce within the SQL client/table api:

CREATE TABLE source_table (
        int_field INT,
        timestamp_field TIMESTAMP(3)
) WITH (
        'connector.type'                           = 'kafka',
        'connector.version'                        = 'universal',
        'connector.topic'                          = 'avro_tset',
        'connector.properties.bootstrap.servers'   = '<...>',
        'format.type'                              = 'avro',
    'format.avro-schema' =
    '{
      "type": "record",
      "name": "test",
      "fields" : [
        {"name": "int_field", "type": "int"},
        {"name": "timestamp_field", "type": {"type":"long",
"logicalType": "timestamp-millis"}}
      ]
    }'
)

INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:

Caused by: java.lang.ClassCastException: java.time.LocalDateTime
cannot be cast to java.lang.Long
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
        at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
        at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at 
org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)

I will open a Jira ticket as well with these details.

Thank you!

Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Gyula,
>
> I have not verified it locally yet, but I think you are hitting yet
> another problem of the unfinished migration from old TypeInformation based
> type system to the new type system based on DataTypes. As far as I
> understand the problem the information about the bridging class
> (java.sql.Timestamp in this case) is lost in the stack. Because this
> information is lost/not respected the planner produces LocalDateTime
> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
> reduce the number of problems.
>
> It's definitely worth reporting a bug.
>
> BTW could you share how you create the Kafka Table sink to have the full
> picture?
>
> Best,
>
> Dawid
> On 29/04/2020 15:42, Gyula Fóra wrote:
>
> Hi All!
>
> We are trying to work with avro serialized data from Kafka using the Table
> API and use TIMESTAMP column type.
>
> According to the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
> we can use long type with logicalType: timestamp-millis.
> So we use the following avro field schema in the descriptor:
>
>
>   {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
> "timestamp-millis"}}
>
> When trying to insert into the table we get the following error:
>
> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot 
> be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long 
> are in module java.base of loader 'bootstrap')      at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>        at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>
> It seems like the avro format (serializer) is not aware of the logical type 
> conversion that is needed to convert back to the physical type long.
>
> I looked at the AvroTypesITCase which uses all kinds of logical types but I 
> could only find logic that maps between Avro Pojos and tables and none that 
> actually uses the serializaiton/deserialization logic with the format.
>
> Could someone please help me with this? Maybe what I am trying to do is not 
> possible, or I just missed a crucial step.
>
> Thank you!
> Gyula
>
>
>
>

Reply via email to