Oops, accidentally sent the email.
The good news is that you don’t have to checkpoint the state of the Kafka 
consumers.

From: Wong Victor <jiasheng.w...@outlook.com>
Date: Tuesday, August 6, 2019 at 11:31 PM
To: Piotr Nowojski <pi...@ververica.com>, 黄兆鹏 <paulhu...@easyops.cn>
Cc: user <user@flink.apache.org>
Subject: Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,
If the performance impact of braking the operator chain is huge, maybe you can 
read the latest schema from Kafka within the operators.

It’s a little complicated, you have to start a Kafka consumer in e.g. ` 
RichFunction#open()` and reading from (the largest offset – 1), and handle new 
messages coming in.
The good news

From: Piotr Nowojski <pi...@ververica.com>
Date: Tuesday, August 6, 2019 at 8:55 PM
To: 黄兆鹏 <paulhu...@easyops.cn>
Cc: user <user@flink.apache.org>
Subject: Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek



On 6 Aug 2019, at 13:11, 黄兆鹏 
<paulhu...@easyops.cn<mailto:paulhu...@easyops.cn>> wrote:

Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it.
In this case is there anything I have missed?

Thank you!






------------------ Original ------------------
From:  "Piotr Nowojski"<pi...@ververica.com<mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"<paulhu...@easyops.cn<mailto:paulhu...@easyops.cn>>;
Cc:  "user"<user@flink.apache.org<mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek



On 6 Aug 2019, at 10:50, 黄兆鹏 
<paulhu...@easyops.cn<mailto:paulhu...@easyops.cn>> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
                          ^                   ^                      ^
                          |                    |                        |
                                  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
                          ^
                          |
                BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining?

Thanks!



------------------ Original ------------------
From:  "Piotr Nowojski"<pi...@ververica.com<mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"<paulhu...@easyops.cn<mailto:paulhu...@easyops.cn>>;
Cc:  "user"<user@flink.apache.org<mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 
> <paulhu...@easyops.cn<mailto:paulhu...@easyops.cn>> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>       ^                   ^                      ^
>       |                    |                       |
>                BroadcastStream
>
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> chained together in one slot because they have the same parallelism so that 
> it can gain maximum performance.
>
> And I was wondering that if the broadcast stream exists, will it affect the 
> performance? Or flink will still chain them together to gain maximum 
> performance?
>
> Thanks!


Reply via email to