Hi Maminspapin again,
have you checked whether your topic actually contains data that matches
your schema specified through cep.model.User?

Best,
Matthias

On Tue, Mar 30, 2021 at 3:39 PM Maminspapin <un...@mail.ru> wrote:

> Hi,
>
> I'm trying to solve a task with getting data from topic. This topic keeps
> avro format data.
>
> I wrote next code:
>
>  public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         Schema schema = ReflectData.get().getSchema(User.class);
>         FlinkKafkaConsumer<GenericRecord> userConsumer = new
> FlinkKafkaConsumer<>(
>                "test_topic",
>                 *// First*
>                 AvroDeserializationSchema.forGeneric(schema),
>                 *// Second*
>                 //
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";),
>                 getConsumerProperties());
>
>         DataStream<GenericRecord> userStream =
> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
>         userStream.print("users");
>
>         env.execute();
>     }
>
> So, as I think right, there are two ways to get the result:
> 1. AvroDeserializationSchema.forGeneric(schema)
> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";)
>
> And I use ReflectData.get().getSchema(User.class) to get schema.
>
>
> Please, Flink guru, tell me if I am on the right way or not.
>
>
> If I use First way, there is next error:
>
> java.io.EOFException
>         at org.apache.avro.io
> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
>         at org.apache.avro.io
> .BinaryDecoder.readInt(BinaryDecoder.java:150)
>         at org.apache.avro.io
> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>
> If I use Second way, there is next error:
>
> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
> expecting cep.model.User, missing required field userId
>         at org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>         at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>
> How can I get the correct result?
>
> Sorry, if duplicated:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>
> Today is third day I'm working with this issue.... (((
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to