Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

> On 6 Aug 2019, at 10:50, 黄兆鹏 <paulhu...@easyops.cn> wrote:
> 
> Hi Piotrek,
> Thanks for your reply, my broadcast stream just listen to the changes of the 
> schema, and it's very infrequent and very lightweight.
> 
> In fact there are two ways to solve my problem,
> 
> the first one is a broadcast stream that listen to the change of the schema, 
> and broadcast to every operator that will handle the data, just as I posted 
> originally.
> DataStream: OperatorA  ->  OperatorB  -> OperatorC
>                           ^                   ^                      ^
>                           |                    |                        |
>                                   BroadcastStream
> 
> the second approach is that I have an operator that will join my data and 
> schema together and send to the downstream operators:
>  DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
>                           ^                 
>                           |                   
>                 BroadcastStream            
> 
> 
> The benefits of the first approach is that the flink job does not have to 
> transfer the schema with the real data records among operators, because the 
> schema will be broadcasted to each operator.
> But the disadvantage of the first approache is that it breaks the operator 
> chain, so operators may not be executed in the same slot and gain worse 
> performance.
> 
> The second approach does not have the problem as the first one, but each 
> message will carry its schema info among operators, it will cost about 2x for 
> serialization and deserialization between operators.
> 
> Is there a better workaround that all the operators could notice the schema 
> change and at the same time not breaking the operator chaining? 
> 
> Thanks!
> 
>  
>  
> ------------------ Original ------------------
> From:  "Piotr Nowojski"<pi...@ververica.com>;
> Date:  Tue, Aug 6, 2019 04:23 PM
> To:  "黄兆鹏"<paulhu...@easyops.cn>;
> Cc:  "user"<user@flink.apache.org>;
> Subject:  Re: Will broadcast stream affect performance because of the absence 
> of operator chaining?
>  
> Hi,
> 
> Broadcasting will brake an operator chain. However my best guess is that 
> Kafka source will be still a performance bottleneck in your job. Also Network 
> exchanges add some measurable overhead only if your records are very 
> lightweight and easy to process (for example if you are using RocksDB then 
> you can just ignore network costs).
> 
> Either way, you can just try this out. Pre populate your Kafka topic with 
> some significant number of messages, run both jobs, compare the throughput 
> and decide based on those results wether this is ok for you or not.
> 
> Piotrek 
> 
> > On 6 Aug 2019, at 09:56, 黄兆鹏 <paulhu...@easyops.cn> wrote:
> > 
> > Hi all, 
> > My flink job has dynamic schema of data, so I want to consume a schema 
> > kafka topic and try to broadcast to every operator so that each operator 
> > could know what kind of data it is handling.
> > 
> > For example, the two streams just like this:
> > OperatorA  ->  OperatorB  -> OperatorC
> >       ^                   ^                      ^
> >       |                    |                       |
> >                BroadcastStream
> > 
> > If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> > chained together in one slot because they have the same parallelism so that 
> > it can gain maximum performance.
> > 
> > And I was wondering that if the broadcast stream exists, will it affect the 
> > performance? Or flink will still chain them together to gain maximum 
> > performance? 
> > 
> > Thanks!

Reply via email to