Hi Adi,

To me, this looks like a version conflict of some kind. Maybe you use
different Avro versions for your user program and on your Flink cluster.
Could you check that you don't have conflicting versions on your classpath?
It would also be helpful to have a minimal example that allows
reproducing the problem (e.g. repo with Flink job and test data).

I am forwarding this mail also to the user ML. Maybe some other community
members have seen this problem before.

Cheers,
Till

On Fri, May 14, 2021 at 9:53 PM Adishesh Kishore <adish...@uniphi.it> wrote:

> Hi Till,
>
> I am using an avro schema to write data to a parquet sink. I am getting
> the following stack trace for some reason(Though I am not using a decimal
> logical type anywhere),
>
> java.lang.IncompatibleClassChangeError: class org.apache.avro.
> LogicalTypes$Decimal has interface org.apache.avro.LogicalType as super
> class
>     at java.lang.ClassLoader.defineClass1(Native Method)
>     at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
> 142)
>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:180)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(
> AvroSchemaConverter.java:214)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:171)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:130)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:227)
>     at org.apache.parquet.avro.AvroSchemaConverter.convertFields(
> AvroSchemaConverter.java:124)
>     at org.apache.parquet.avro.AvroSchemaConverter.convert(
> AvroSchemaConverter.java:115)
>     at org.apache.parquet.avro.AvroParquetWriter.writeSupport(
> AvroParquetWriter.java:150)
>     at org.apache.parquet.avro.AvroParquetWriter.access$200(
> AvroParquetWriter.java:36)
>     at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(
> AvroParquetWriter.java:182)
>     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
> .java:529)
>     at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .createAvroParquetWriter(ParquetAvroWriters.java:87)
>     at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:61)
>     at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
> ParquetWriterFactory.java:57)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .rollPartFile(Bucket.java:222)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:212)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:378)
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingParquetSink.invoke(StreamingParquetSink.java:555)
>
> I am using the following schema to create the writer,
>
> {
>   "type": "error",
>   "name": "root",
>   "namespace": "root",
>   "doc": "",
>   "fields": [
>     {
>       "name": "account_id",
>       "type": [
>         "long",
>         "null"
>       ],
>       "doc": "",
>       "default": 0
>     },
>     {
>       "name": "amount",
>       "type": [
>         "long",
>         "null"
>       ],
>       "doc": "",
>       "default": 0
>     },
>     {
>       "name": "date",
>       "type": [
>         {
>           "type": "string",
>           "logicalType": "date"
>         },
>         "null"
>       ],
>       "doc": "",
>       "default": ""
>     },
>     {
>       "name": "duration",
>       "type": [
>         "long",
>         "null"
>       ],
>       "doc": "",
>       "default": 0
>     },
>     {
>       "name": "loan_id",
>       "type": [
>         "long",
>         "null"
>       ],
>       "doc": "",
>       "default": 0
>     },
>     {
>       "name": "payments",
>       "type": [
>         "double",
>         "null"
>       ],
>       "doc": "",
>       "default": 0
>     },
>     {
>       "name": "status",
>       "type": [
>         "string",
>         "null"
>       ],
>       "doc": "",
>       "default": ""
>     }
>   ]
> }
>
>
> Appreciate any help in advance!
>
> Thanks,
> Adi
>

Reply via email to