Hi team, I want to implement a flink job to read avro files from s3 path, and output to a kafka topic.
Currently, I am using AvroInputFormat like this: AvroInputFormat<Session> avroInputFormat = new AvroInputFormat<>(new Path(S3PathString), Session.class); TypeInformation<CustomerSession> typeInfo = TypeInformation.of(Session.class); DataStream<Session> datastream = env.createInput(avroInputFormat, typeInfo); But encountered with AvroTypeException in the job. Does it mean AvroInputFormat is not a good way to read from s3 path? What about FileSource? Is it better than AvroInputFormat? Can I get some suggestions here? Thanks! Best, Lijuan Attaching the exception in the job: org.apache.avro.AvroTypeException: Found KafkaRecord, expecting ……Session, missing required field channel at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) ~[?:?] at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) ~[?:?] at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) ~[?:?] at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240) ~[?:?] at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[?:?] at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[?:?] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[?:?] at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[?:?] at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263) ~[?:?] at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170) ~[?:?] at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:390) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:352) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Unknown Source) ~[?:?]