Hi team,

I want to implement a flink job to read avro files from s3 path, and output to 
a kafka topic.

Currently, I am using AvroInputFormat like this:

AvroInputFormat<Session> avroInputFormat =
    new AvroInputFormat<>(new Path(S3PathString), Session.class);
TypeInformation<CustomerSession> typeInfo = TypeInformation.of(Session.class);
DataStream<Session> datastream = env.createInput(avroInputFormat, typeInfo);

But encountered with AvroTypeException in the job.
Does it mean AvroInputFormat is not a good way to read from s3 path?
What about FileSource? Is it better than AvroInputFormat?

Can I get some suggestions here? Thanks!

Best,
Lijuan

Attaching the exception in the job:
org.apache.avro.AvroTypeException: Found KafkaRecord, expecting ……Session, 
missing required field channel
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) 
~[?:?] at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) ~[?:?] at 
org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) 
~[?:?] at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240)
 ~[?:?] at 
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
 ~[?:?] at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[?:?] at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[?:?] at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[?:?] at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263) 
~[?:?] at 
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
 ~[?:?] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:390)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Unknown Source) ~[?:?]




Reply via email to