Thanks Gouwei, setting format.setReuseAvroValue(false) with 1.8.2-generated records does not solve the problem.
12:02:59,314 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937. java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) at org.apache.avro.generic.GenericData.setField(GenericData.java:690) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) ---------------------------- Summarising, the only working combination seems to be: - Use AVRO 1.9.2 code generation, setting dateTimeLogicalTypeImplementation = joda - Enabling Object Reuse (being careful for the implications) Using AVRO 1.8.2 code generation does not work, with any of the other workarounds. Using Generic objects does not work for a bug in AvroSerializer <https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords also brings a number of other problems. I am not very comfortable with using AVRO objects generated with a different AVRO version than the one supported by Flink. I am going to map AVRO records into hand-written POJOs immediately after the ingestion to reduce chances of further issues. I reckon this is very empirical, but that's what the workaround looks to me :) Lorenzo P.S, I want to give a massive thank to this community. So far it has been one of the most reactive and helpful I ever interacted with. On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote: > Hi, > for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false); > > Best, > Guowei > > > Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道: > >> Hi Arvid, >> >> thanks for the point about catching records. Gotcha! >> >> Sorry I cannot share the full schema or generated code. It's a 3rd party >> IP and we signed a meter-think NDA... I think I can post snippets. >> The schema is heavily nested, including arrays of other record types >> Types are primitives, or logical decimal and timestamp-millis. No union. >> >> #conversion is in AccountEntries only (one of the nested records) and >> looks like this: >> >> private static final org.apache.avro.Conversion<?>[] conversions = >> new org.apache.avro.Conversion<?>[] { >> null, >> null, >> null, >> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), >> null, >> null, >> null, >> null, >> null, >> null, >> null >> }; >> >> >> Note that I have to generate the specific object with AVRO 1.9.2 Maven >> Plugin. >> With 1.8.2 generated code it fails with the following exception, >> regardless setting enableObjectReuse() >> >> java.lang.ClassCastException: java.lang.Long cannot be cast to >> org.joda.time.DateTime >> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >> at >> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >> >> >> Thanks for the help >> Lorenzo >> >> >> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Lorenzo, >>> >>> I'm glad that it worked out somehow, but I'd still like to understand >>> what went wrong, so it will work more smoothly for future users. I double >>> checked and we even test AvroSerializer with logical types, so I'm a bit >>> puzzled. >>> >>> Could you attach GlHeader or at least show us how GlHeader#conversions look >>> like? I want to exclude the possibility that the source generator screwed >>> up. >>> >>> Concerning object reuse is that you need to treat all POJO as immutable >>> (I'm assuming that that's what your meant from your description), but you >>> should also never cache values like >>> class ShiftElements extends MapFunction { >>> Object lastElement; >>> >>> Object map(Object newElement, Collector out) { >>> out.collect(lastElement); >>> lastElement = newElement; // <- never cache with enableObjectReuse >>> } >>> } >>> >>> (excuse my ugly code) >>> >>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> >>> wrote: >>> >>>> Hi Arvid, >>>> >>>> answering to your other questions >>>> >>>> Here is the stacktrace of the case (1), when I try to read using >>>> specific records generated by the AVRO 1.8.2 plugin >>>> >>>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>>> org.joda.time.DateTime >>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>>> at >>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> at >>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>>> at >>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>>> at >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>>> >>>> >>>> I also tried generating the specific object with avro 1.9.2 (2) but >>>> forcing it to use Joda time but still didn't work >>>> >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>> at >>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z >>>> at >>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>> at >>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>> at >>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>> at >>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>> ... 7 more >>>> >>>> >>>> But in the second case, it seems the failure happens when Flink tries >>>> to make a copy of the record. >>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!* >>>> >>>> I am not sure I understand all implications of object reuse in Flink, >>>> specifically. >>>> I am familiar with the general risk of mutable messages, and I always >>>> handle them as mutable even when they are POJO. Never mutating and >>>> forwarding the same record. >>>> Not sure whether there are other implications in Flink. >>>> >>>> Many thanks >>>> Lorenzo >>>> >>>> >>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote: >>>> >>>>> Hi Lorenzo, >>>>> >>>>> 1) I'm surprised that this doesn't work. I'd like to see that >>>>> stacktrace. >>>>> >>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could >>>>> retest with dateTimeLogicalType='Joda' set, but then you will >>>>> probably see the same issue as 1) >>>>> >>>>> 3) I'm surprised that this doesn't work either. There is a codepath >>>>> since 2016 for GenericRecord and it's covered in a test. From the error >>>>> description and the ticket, it looks like the issue is not the >>>>> AvroInputFormat, but the serializer. So it would probably work with a >>>>> different serializer (but that would cause back and forth type >>>>> transformation). >>>>> >>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora < >>>>> lorenzo.nic...@gmail.com> wrote: >>>>> >>>>>> Thanks Timo, >>>>>> >>>>>> the stacktrace with 1.9.2-generated specific file is the following >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>>> Could not forward element to next operator >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>>> at >>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>> java.time.Instant: 2020-06-01T02:00:42.105Z >>>>>> at >>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>>> at >>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>>> at >>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>>> ... 7 more >>>>>> >>>>>> >>>>>> I reckon logical types might have been considered somehow >>>>>> experimental since...ever. But, honestly, I've been using them in the >>>>>> Kafka/Java ecosystem as well as in Spark without too many problems. >>>>>> >>>>>> For my specific use case, the schema is given. Messages are produced >>>>>> by a 3rd party and we cannot change the schema (especially because it's a >>>>>> legit schema). >>>>>> I am desperately looking for a workaround. >>>>>> >>>>>> I had a similar issue with a Kafka Source, and AVRO records >>>>>> containing decimals and timestamps. Timestamps worked but not decimals. >>>>>> I was able to work around the problem using GenericRecords. >>>>>> But Kafka source relies on AvroDeserializationSchema rather than >>>>>> AvroSerializer, and has no problem handling GenericRecords. >>>>>> >>>>>> I'm honestly finding very confusing having different ways of handling >>>>>> AVRO deserialization inside Flink core components. >>>>>> >>>>>> Cheers >>>>>> Lorenzo >>>>>> >>>>>> >>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Lorenzo, >>>>>>> >>>>>>> as far as I know we don't support Avro's logical times in Flink's >>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports >>>>>>> the >>>>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2. >>>>>>> >>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>>>>> >>>>>>> Could you send us the full stack trace? I think this should actually >>>>>>> work, because specific records are handled as POJOs and those should >>>>>>> be >>>>>>> able to also deal with logical type's classes through Kryo. >>>>>>> >>>>>>> Reg 3) Generic record >>>>>>> >>>>>>> It would be great if we can make this option possible. We could >>>>>>> include >>>>>>> it in the next minor release fix. >>>>>>> >>>>>>> Sorry, for the bad user experience. But IMHO logical type are still >>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>>>>> shortcomings such that Flink can properly support them as well. >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>>> [1] >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>>>>> >>>>>>> >>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>>>>> > Hi, >>>>>>> > >>>>>>> > I need to continuously ingest AVRO files as they arrive. >>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the >>>>>>> point >>>>>>> > here. I started trying to ingest a static bunch of files from >>>>>>> local fs >>>>>>> > first and I am having weird issues with AVRO deserialization. >>>>>>> > >>>>>>> > I have to say, the records contain logical types, timestamps-ms >>>>>>> and decimals >>>>>>> > >>>>>>> > To keep it simple, I extracted the AVRO schema from the data files >>>>>>> and >>>>>>> > used avro-maven-plugin to generate POJOs >>>>>>> > I tried multiple combinations, all with no luck >>>>>>> > >>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>>>>> > >>>>>>> > Path in = new Path(sourceBasePath); >>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>>>>> AvroInputFormat<>(in, >>>>>>> > AccountEntries.class); >>>>>>> > DataStream<AccountEntries> accountEntries = env >>>>>>> > .readFile(inputFormat, sourceBasePath, >>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>>>>> > >>>>>>> > *Result* >>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>>> > org.joda.time.DateTime >>>>>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>>>>> > >>>>>>> > >>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>>>>> > >>>>>>> > *Result* >>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>>> java.time.Instant >>>>>>> > >>>>>>> > >>>>>>> > 3) Generic record >>>>>>> > I am getting the Schema from the generated specific record, for >>>>>>> > convenience, but I am not using the generated POJO as record. >>>>>>> > I also followed the suggestions in this Flink blog post >>>>>>> > < >>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>>>>> >>>>>>> > to explicitly specify the TypeInfo with returns(...) >>>>>>> > >>>>>>> > Path in = new Path(config.sourceFileSystemPath); >>>>>>> > Schema schema = AccountEntries.getClassSchema(); >>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>>>>> AvroInputFormat<>(in, >>>>>>> > GenericRecord.class); >>>>>>> > DataStream<GenericRecord> accountEntries = env >>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>>>>> > >>>>>>> > >>>>>>> > *Result* >>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>>>>> instantiable: >>>>>>> > The class is not a proper class. It is either abstract, an >>>>>>> interface, or >>>>>>> > a primitive type. >>>>>>> > >>>>>>> > This looks like a bug. >>>>>>> > I raised the ticket < >>>>>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>>>>> > and I will try to submit a fix, but still do not solve my problem >>>>>>> as I >>>>>>> > am using a managed Flink I cannot update. >>>>>>> > I cannot believe there is no workaround. I do not think I'm trying >>>>>>> to do >>>>>>> > anything bizarre. Am I? >>>>>>> > >>>>>>> > Any ideas? >>>>>>> > Am I missing something obvious? >>>>>>> > >>>>>>> > Cheers >>>>>>> > Lorenzo >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >>