Hi,

I'm trying to solve a task with getting data from topic. This topic keeps
avro format data.

I wrote next code:   

 public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        Schema schema = ReflectData.get().getSchema(User.class);
        FlinkKafkaConsumer<GenericRecord> userConsumer = new
FlinkKafkaConsumer<>(
               "test_topic",
                *// First* 
                AvroDeserializationSchema.forGeneric(schema),
                *// Second*
                //
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081";),
                getConsumerProperties());

        DataStream<GenericRecord> userStream =
env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
        userStream.print("users");

        env.execute();
    }

So, as I think right, there are two ways to get the result:
1. AvroDeserializationSchema.forGeneric(schema)
2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081";)

And I use ReflectData.get().getSchema(User.class) to get schema.


Please, Flink guru, tell me if I am on the right way or not.


If I use First way, there is next error:

java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:150)
        at 
org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)

If I use Second way, there is next error:

Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
expecting cep.model.User, missing required field userId
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)

How can I get the correct result?

Sorry, if duplicated:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html

Today is third day I'm working with this issue.... (((




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to