Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

> On 6 Aug 2019, at 13:11, 黄兆鹏 <paulhu...@easyops.cn> wrote:
> 
> Hi, Piotrek,
> I previously considered your first advice(use union record type), but I found 
> that the schema would be only sent to one subtask of the operator(for 
> example, operatorA), and other subtasks of the operator are not aware of it. 
> In this case is there anything I have missed? 
> 
> Thank you!
> 
> 
> 
> 
> 
>  
> ------------------ Original ------------------
> From:  "Piotr Nowojski"<pi...@ververica.com>;
> Date:  Tue, Aug 6, 2019 06:57 PM
> To:  "黄兆鹏"<paulhu...@easyops.cn>;
> Cc:  "user"<user@flink.apache.org>;
> Subject:  Re: Will broadcast stream affect performance because of the absence 
> of operator chaining?
>  
> Hi,
> 
> Have you measured the performance impact of braking the operator chain?
> 
> This is a current limitation of Flink chaining, that if an operator has two 
> inputs, it can be chained to something else (only one input operators are 
> chained together). There are plans for the future to address this issue.
> 
> As a workaround, besides what you have mentioned:
> - maybe your record type can be a union: type of Record or Schema (not Record 
> AND Schema), and upstream operators (operatorA) could just ignore/forward the 
> Schema. You wouldn’t need to send schema with every record.
> - another (ugly) solution, is to implement BroadcastStream input outside of 
> Flink, but then you might have issues with checkpointing/watermarking and it 
> just makes many things more complicated.
> 
> Piotrek
> 
>> On 6 Aug 2019, at 10:50, 黄兆鹏 <paulhu...@easyops.cn 
>> <mailto:paulhu...@easyops.cn>> wrote:
>> 
>> Hi Piotrek,
>> Thanks for your reply, my broadcast stream just listen to the changes of the 
>> schema, and it's very infrequent and very lightweight.
>> 
>> In fact there are two ways to solve my problem,
>> 
>> the first one is a broadcast stream that listen to the change of the schema, 
>> and broadcast to every operator that will handle the data, just as I posted 
>> originally.
>> DataStream: OperatorA  ->  OperatorB  -> OperatorC
>>                           ^                   ^                      ^
>>                           |                    |                        |
>>                                   BroadcastStream
>> 
>> the second approach is that I have an operator that will join my data and 
>> schema together and send to the downstream operators:
>>  DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
>>                           ^                 
>>                           |                   
>>                 BroadcastStream           
>> 
>> 
>> The benefits of the first approach is that the flink job does not have to 
>> transfer the schema with the real data records among operators, because the 
>> schema will be broadcasted to each operator.
>> But the disadvantage of the first approache is that it breaks the operator 
>> chain, so operators may not be executed in the same slot and gain worse 
>> performance.
>> 
>> The second approach does not have the problem as the first one, but each 
>> message will carry its schema info among operators, it will cost about 2x 
>> for serialization and deserialization between operators.
>> 
>> Is there a better workaround that all the operators could notice the schema 
>> change and at the same time not breaking the operator chaining?
>> 
>> Thanks!
>> 
>>  
>>  
>> ------------------ Original ------------------
>> From:  "Piotr Nowojski"<pi...@ververica.com <mailto:pi...@ververica.com>>;
>> Date:  Tue, Aug 6, 2019 04:23 PM
>> To:  "黄兆鹏"<paulhu...@easyops.cn <mailto:paulhu...@easyops.cn>>;
>> Cc:  "user"<user@flink.apache.org <mailto:user@flink.apache.org>>;
>> Subject:  Re: Will broadcast stream affect performance because of the 
>> absence of operator chaining?
>>  
>> Hi,
>> 
>> Broadcasting will brake an operator chain. However my best guess is that 
>> Kafka source will be still a performance bottleneck in your job. Also 
>> Network exchanges add some measurable overhead only if your records are very 
>> lightweight and easy to process (for example if you are using RocksDB then 
>> you can just ignore network costs).
>> 
>> Either way, you can just try this out. Pre populate your Kafka topic with 
>> some significant number of messages, run both jobs, compare the throughput 
>> and decide based on those results wether this is ok for you or not.
>> 
>> Piotrek 
>> 
>> > On 6 Aug 2019, at 09:56, 黄兆鹏 <paulhu...@easyops.cn 
>> > <mailto:paulhu...@easyops.cn>> wrote:
>> > 
>> > Hi all, 
>> > My flink job has dynamic schema of data, so I want to consume a schema 
>> > kafka topic and try to broadcast to every operator so that each operator 
>> > could know what kind of data it is handling.
>> > 
>> > For example, the two streams just like this:
>> > OperatorA  ->  OperatorB  -> OperatorC
>> >       ^                   ^                      ^
>> >       |                    |                       |
>> >                BroadcastStream
>> > 
>> > If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC 
>> > are chained together in one slot because they have the same parallelism so 
>> > that it can gain maximum performance.
>> > 
>> > And I was wondering that if the broadcast stream exists, will it affect 
>> > the performance? Or flink will still chain them together to gain maximum 
>> > performance? 
>> > 
>> > Thanks!
> 

Reply via email to