Hi,

it seems as if the data is written with a confluent registry in mind, so
you cannot use option 1: the kafka record is invalid avro as it contains a
5 byte prefix that identifies the schema.

So the second way, is the way to go and it actually works well: it tells
you that you have read with a schema that is mismatching the data. Once you
use the correct schema (user_visit.Envelope), it will work.

On Wed, Mar 31, 2021 at 1:46 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Maminspapin again,
> have you checked whether your topic actually contains data that matches
> your schema specified through cep.model.User?
>
> Best,
> Matthias
>
> On Tue, Mar 30, 2021 at 3:39 PM Maminspapin <un...@mail.ru> wrote:
>
>> Hi,
>>
>> I'm trying to solve a task with getting data from topic. This topic keeps
>> avro format data.
>>
>> I wrote next code:
>>
>>  public static void main(String[] args) throws Exception {
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         Schema schema = ReflectData.get().getSchema(User.class);
>>         FlinkKafkaConsumer<GenericRecord> userConsumer = new
>> FlinkKafkaConsumer<>(
>>                "test_topic",
>>                 *// First*
>>                 AvroDeserializationSchema.forGeneric(schema),
>>                 *// Second*
>>                 //
>> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";),
>>                 getConsumerProperties());
>>
>>         DataStream<GenericRecord> userStream =
>> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
>>         userStream.print("users");
>>
>>         env.execute();
>>     }
>>
>> So, as I think right, there are two ways to get the result:
>> 1. AvroDeserializationSchema.forGeneric(schema)
>> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";)
>>
>> And I use ReflectData.get().getSchema(User.class) to get schema.
>>
>>
>> Please, Flink guru, tell me if I am on the right way or not.
>>
>>
>> If I use First way, there is next error:
>>
>> java.io.EOFException
>>         at org.apache.avro.io
>> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
>>         at org.apache.avro.io
>> .BinaryDecoder.readInt(BinaryDecoder.java:150)
>>         at org.apache.avro.io
>> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>>
>> If I use Second way, there is next error:
>>
>> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
>> expecting cep.model.User, missing required field userId
>>         at org.apache.avro.io
>> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>         at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>
>> How can I get the correct result?
>>
>> Sorry, if duplicated:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>>
>> Today is third day I'm working with this issue.... (((
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Reply via email to