Hi Maminspapin again, have you checked whether your topic actually contains data that matches your schema specified through cep.model.User?
Best, Matthias On Tue, Mar 30, 2021 at 3:39 PM Maminspapin <un...@mail.ru> wrote: > Hi, > > I'm trying to solve a task with getting data from topic. This topic keeps > avro format data. > > I wrote next code: > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > Schema schema = ReflectData.get().getSchema(User.class); > FlinkKafkaConsumer<GenericRecord> userConsumer = new > FlinkKafkaConsumer<>( > "test_topic", > *// First* > AvroDeserializationSchema.forGeneric(schema), > *// Second* > // > ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, > "http://xxx.xx.xxx.xx:8081"), > getConsumerProperties()); > > DataStream<GenericRecord> userStream = > env.addSource(userConsumer).name("UserSource").uid("UserSourceUID"); > userStream.print("users"); > > env.execute(); > } > > So, as I think right, there are two ways to get the result: > 1. AvroDeserializationSchema.forGeneric(schema) > 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, > "http://xxx.xx.xxx.xx:8081") > > And I use ReflectData.get().getSchema(User.class) to get schema. > > > Please, Flink guru, tell me if I am on the right way or not. > > > If I use First way, there is next error: > > java.io.EOFException > at org.apache.avro.io > .BinaryDecoder.ensureBounds(BinaryDecoder.java:510) > at org.apache.avro.io > .BinaryDecoder.readInt(BinaryDecoder.java:150) > at org.apache.avro.io > .ValidatingDecoder.readInt(ValidatingDecoder.java:82) > > If I use Second way, there is next error: > > Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope, > expecting cep.model.User, missing required field userId > at org.apache.avro.io > .ResolvingDecoder.doAction(ResolvingDecoder.java:308) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) > > How can I get the correct result? > > Sorry, if duplicated: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html > > Today is third day I'm working with this issue.... ((( > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/