Re: Proper way to get DataStream

2021-04-09 Thread Maminspapin
Arvid Heise-4, Ok, this is clear for me now. Good answer. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Proper way to get DataStream

2021-04-08 Thread Arvid Heise
Hi Maminspapin,

I just answered another question similarly, so let me just c&p it here:

The beauty of Avro lies in having reader and writer schema and schema
compatibility, such that if your schema evolves over time (which will
happen in streaming naturally but is also very common in batch), you can
still use your application as is without modification. For streaming, this
methodology also implies that you can process elements with different
schema versions in the same run, which is mandatory for any non-toy example.

If you read into this topic, you will realize that it doesn't make sense to
read from Avro without specifying your reader schema (except for some
generic applications, but they should be written in DataStream). If you
keep in mind that your same dataset could have different schemas, you will
notice that your ideas quickly reach some limitations (which schema to
take?). What you could do, is to write a small script to generate the
schema DDL from your current schema in your actual data if you have very
many columns and datasets. It certainly would also be an interesting idea
to pass a static Avro/Json schema to the DDL.

Note that in KafkaStreams, you have the same issue. You usually generate
your Java classes from some schema version, which will become your reader
schema. You can and should do the same in Flink. Please read [1] for more
information.

[1] https://www.baeldung.com/java-apache-avro#read-schema

On Sun, Apr 4, 2021 at 4:21 PM Maminspapin  wrote:

> Hi, @Arvid Heise-4, @Matthias
>
> I'm very appreciate for your attention, guys. And sorry for my late reply.
>
> Yes, Arvid, you are right, the second way in fact works. I coppied schema
> from Schema Registry using it's API and created the .avsc format file. And
> thanks again for explaining me why the first way is not compatible.
>
> So, my code to define schema is (I don't know is it good decision...):
>
> Path path = Paths.get("path_to_schema/schema.avsc");
> String content = new String(Files.readAllBytes(path));
> Schema schema = new Schema.Parser().parse(content);
>
> And it really works.
>
> But, I don't understand why should I use two schemas:
> 1. schema I created (reader schema)
> 2. schema I get with SR url (writer schema)
>
> I have some expirience with KafkaStreams lib and using it there is no need
> to get reader schema. There is one service to communicate with schemas -
> it's Schema Registry. Why not to use single source to get schema in Flink?
>
>
> Again, the second way is correct, and I can to go farther with my program.
>
> Thanks.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Proper way to get DataStream

2021-04-04 Thread Maminspapin
Hi, @Arvid Heise-4, @Matthias

I'm very appreciate for your attention, guys. And sorry for my late reply.

Yes, Arvid, you are right, the second way in fact works. I coppied schema
from Schema Registry using it's API and created the .avsc format file. And
thanks again for explaining me why the first way is not compatible.

So, my code to define schema is (I don't know is it good decision...):

Path path = Paths.get("path_to_schema/schema.avsc");
String content = new String(Files.readAllBytes(path));
Schema schema = new Schema.Parser().parse(content);

And it really works.

But, I don't understand why should I use two schemas:
1. schema I created (reader schema)
2. schema I get with SR url (writer schema)

I have some expirience with KafkaStreams lib and using it there is no need
to get reader schema. There is one service to communicate with schemas -
it's Schema Registry. Why not to use single source to get schema in Flink?


Again, the second way is correct, and I can to go farther with my program.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Proper way to get DataStream

2021-04-01 Thread Arvid Heise
Hi,

it seems as if the data is written with a confluent registry in mind, so
you cannot use option 1: the kafka record is invalid avro as it contains a
5 byte prefix that identifies the schema.

So the second way, is the way to go and it actually works well: it tells
you that you have read with a schema that is mismatching the data. Once you
use the correct schema (user_visit.Envelope), it will work.

On Wed, Mar 31, 2021 at 1:46 PM Matthias Pohl 
wrote:

> Hi Maminspapin again,
> have you checked whether your topic actually contains data that matches
> your schema specified through cep.model.User?
>
> Best,
> Matthias
>
> On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:
>
>> Hi,
>>
>> I'm trying to solve a task with getting data from topic. This topic keeps
>> avro format data.
>>
>> I wrote next code:
>>
>>  public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> Schema schema = ReflectData.get().getSchema(User.class);
>> FlinkKafkaConsumer userConsumer = new
>> FlinkKafkaConsumer<>(
>>"test_topic",
>> *// First*
>> AvroDeserializationSchema.forGeneric(schema),
>> *// Second*
>> //
>> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";),
>> getConsumerProperties());
>>
>> DataStream userStream =
>> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
>> userStream.print("users");
>>
>> env.execute();
>> }
>>
>> So, as I think right, there are two ways to get the result:
>> 1. AvroDeserializationSchema.forGeneric(schema)
>> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>> "http://xxx.xx.xxx.xx:8081";)
>>
>> And I use ReflectData.get().getSchema(User.class) to get schema.
>>
>>
>> Please, Flink guru, tell me if I am on the right way or not.
>>
>>
>> If I use First way, there is next error:
>>
>> java.io.EOFException
>> at org.apache.avro.io
>> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
>> at org.apache.avro.io
>> .BinaryDecoder.readInt(BinaryDecoder.java:150)
>> at org.apache.avro.io
>> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>>
>> If I use Second way, there is next error:
>>
>> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
>> expecting cep.model.User, missing required field userId
>> at org.apache.avro.io
>> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>
>> How can I get the correct result?
>>
>> Sorry, if duplicated:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>>
>> Today is third day I'm working with this issue (((
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Proper way to get DataStream

2021-03-31 Thread Matthias Pohl
Hi Maminspapin again,
have you checked whether your topic actually contains data that matches
your schema specified through cep.model.User?

Best,
Matthias

On Tue, Mar 30, 2021 at 3:39 PM Maminspapin  wrote:

> Hi,
>
> I'm trying to solve a task with getting data from topic. This topic keeps
> avro format data.
>
> I wrote next code:
>
>  public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Schema schema = ReflectData.get().getSchema(User.class);
> FlinkKafkaConsumer userConsumer = new
> FlinkKafkaConsumer<>(
>"test_topic",
> *// First*
> AvroDeserializationSchema.forGeneric(schema),
> *// Second*
> //
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";),
> getConsumerProperties());
>
> DataStream userStream =
> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
> userStream.print("users");
>
> env.execute();
> }
>
> So, as I think right, there are two ways to get the result:
> 1. AvroDeserializationSchema.forGeneric(schema)
> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
> "http://xxx.xx.xxx.xx:8081";)
>
> And I use ReflectData.get().getSchema(User.class) to get schema.
>
>
> Please, Flink guru, tell me if I am on the right way or not.
>
>
> If I use First way, there is next error:
>
> java.io.EOFException
> at org.apache.avro.io
> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
> at org.apache.avro.io
> .BinaryDecoder.readInt(BinaryDecoder.java:150)
> at org.apache.avro.io
> .ValidatingDecoder.readInt(ValidatingDecoder.java:82)
>
> If I use Second way, there is next error:
>
> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
> expecting cep.model.User, missing required field userId
> at org.apache.avro.io
> .ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>
> How can I get the correct result?
>
> Sorry, if duplicated:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
>
> Today is third day I'm working with this issue (((
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Proper way to get DataStream

2021-03-30 Thread Maminspapin
Hi,

I'm trying to solve a task with getting data from topic. This topic keeps
avro format data.

I wrote next code:   

 public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

Schema schema = ReflectData.get().getSchema(User.class);
FlinkKafkaConsumer userConsumer = new
FlinkKafkaConsumer<>(
   "test_topic",
*// First* 
AvroDeserializationSchema.forGeneric(schema),
*// Second*
//
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081";),
getConsumerProperties());

DataStream userStream =
env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
userStream.print("users");

env.execute();
}

So, as I think right, there are two ways to get the result:
1. AvroDeserializationSchema.forGeneric(schema)
2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081";)

And I use ReflectData.get().getSchema(User.class) to get schema.


Please, Flink guru, tell me if I am on the right way or not.


If I use First way, there is next error:

java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:150)
at 
org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)

If I use Second way, there is next error:

Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
expecting cep.model.User, missing required field userId
at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)

How can I get the correct result?

Sorry, if duplicated:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html

Today is third day I'm working with this issue (((




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/