Correct some thoughts in my following 2).

For handling `randomEmit` in BroadcastRecordWriter, it still faces the same 
mixed problem as Piotr mentioned before, and we may have to flush/finish the 
previous BufferBuilder in different mode. And it has the same considerations as 
my following 4).

So the determined improvement is just reducing the serialization times for 
multi channels in first step, that means serializing only once and copy many 
times to different BufferBuilders as Piotr suggested. Then we can consider how 
to avoid extra copy further based on the first step. Maybe we can think of 
other ways to improve copy or confirm finish non-full BufferBuilder no harms.

I think we can get good benefits from serialization cost and memory overhead 


发件人:Zhijiang(wangzhijiang999) <>
发送时间:2018年7月18日(星期三) 18:26
收件人:dev <>; Piotr Nowojski <>
主 题:回复:[DISCUSS] Improve broadcast serialization

Hi Piotr,

Your thoughts bring me more inspirations and possibilities.

1. I want to define a new AbstractRecordWriter as base class which defines some 
abstract methods and utility codes. The current RecordWriter used for other 
partitioner and new BroadcastRecordWriter used only for BroadcastPartitioner 
will both extend AbstractRecordWriter. The fields in BroadcastPartitioner are 
extactly as you showed below, but for current RecordWriter it also only needs 
one RecordSerializer if we make the RecordSerializer has no internal state.

2. You pointed the key problem that how to handle `randomEmit` in 
BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
only once. So this improvement is deterministic for BroadcastPartitioner.

3. As for `emit` improvement in RecordWriter for non-broadcast partitioner, if 
the record is emitted to multi channels(not only all channels), we can 
serialized data only once and then copy to multi BufferBuilder as you suggested 
in option [c]. I think this improvement is also deterministic for saving 
serialization cost.

4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
partitioner, we can also do as you suggested in option [2], but it has to 
finish/flush the previous BufferBuilder generated by common `emit` operation. 
So it may bring bad impacts on buffer utility which was improved well in 
event-driven flush feature. So I am not sure whether it is worth doing 
`broadcastEmit` improvement in RecordWriter.

5. The current StreamRecordWriter is redundant to some extent. It only 
maintains the `flusher` compared with RecordWriter, and this flusher can also 
be maintained by RecordWriter as well. Because the current `flushAlways` 
maintained by RecordWriter and `flusher` in StreamRecordWriter are both 
inferred by timeout parameter. But this may be out of the scope of 
serialization improvement. :)  The motivation isthe new BroadcastRecordWriter 
can used for both stream and batch job.

In summary,  the above 1-3 is clear to do from theory aspect. But considering 
specific implementation, we may need think more for graceful codes, especially 
for the above (2).

I already realized the demo covering above 1,2,5 before. I can create jiras 
after we reach a final agreement, then maybe you can help review PR if have 
time. :)


发件人:Piotr Nowojski <>
发送时间:2018年7月18日(星期三) 16:37
收件人:dev <>; Zhijiang(wangzhijiang999) 
主 题:Re: [DISCUSS] Improve broadcast serialization


Couple of more thoughts

a) I’m not sure if you would have to modify current RecordWriter at all. You 
could extract interface from current RecordWriter and just provide two 
implementations: current one and BroadcastRecordWriter. I’m not sure, but it 
doesn’t seem like they would duplicate/share lots of code. 
BroadcastRecordWriter would have fields:

private final RecordSerializer<T> serializers;

private final Optional<BufferBuilder> bufferBuilder;

Compared to RecordWriter’s arrays.

b) One thing that I noticed now are latency markers and randomEmit method. It 
prevents us from implementing option [1]. BroadcastRecordWriter would have to 
flush all channels on randomEmit (as I proposed in option [2]).

c) Another option to optimise broadcast writes (or for that matter all multi 
channel writes), would be to serialise record only once to 
SpanningRecordSerializer#serializationBuffer, but copy it multiple times to 
separate BufferBuilders. That would save us much more then half of the overhead 
(serialisation is more costly compared to data copying), while we would avoid 
problems with uneven state of channels. There would be no problems with mixed 
broadcast/non broadcast writes, this option could support both of them at the 
same time - in other words, it would be as generic as the current one.

d) Regarding StreamRecordWriter, other option is, that it could be refactored 
to a class implementing extracted RecordWriter interface and being a 
proxy/wrapper around another RecordWriter instance:

Class StreamRecordWriter implements RecordWriter {
  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
  public void foo() {;

To be honest I’m not sure at the moment which one would be better [2] or [c]. 
In ideal world, we might want to replace current RecordWriter with [c] and 
after that (if that’s not enough) to implement [2] on top of [c]. 


> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) 
> <> wrote:
> Hi Piotr,
> Thanks for your replies and professional suggestions!
> My initial thought is just as you said in first suggestion. The current 
> RecordWriter will emit StreamRecord to some subpartition via ChannelSelector 
> or broadcast events/watermark to all subpartitions directly.
> If the ChannelSelector implementation is BroadcastPartitioner, then we can 
> create a specialized BroadcastRecordWriter to handle the 'emit', 
> 'broadcastEmit', 'broadcastEvent', etc.
> To make it seems not tricky, I want to abstract the RecordWriter as a plugin, 
> then implement a BroadcastRecordWriter and NonBroadcastRecordWriter 
> separately to extend abstract RecordWriter. That means we divide the 
> RecordWriter by ChannelSelector, and also we may remove current 
> StreamRecordWriter to uniform the RecordWriter criteria in both stream and 
> batch mode.
> Considering specific implementations, I think one RecordSerializer can work 
> for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the 
> precondition is making the RecordSerializer has no internal state, so we have 
> to remove the BufferBuilder variable from SpanningRecordSerializer and pass 
> it via addRecord/continueWritingWithNextBufferBuilder
> methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
> BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
> maintain one BufferBuilder per subpartition.
> Another issue is whether this improvement is suitable for 
> broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in 
> suggestion 2,3. I wonder it may decrease the buffer utilization if switch 
> between broadcast and non-broadcast modes, even it may seem more tricky in 
> implementation. I am still thinking of it.
> Maybe we can implement the improvement for BroadcastPartitioner in first step 
> and make sure one RecordSerializer for all subpartitions. That can reduce the 
> memory overhead in RecordSerializer and the time cost in broadcast 
> serialization scenarios.
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年7月17日(星期二) 23:31
> 收件人:dev <>; Zhijiang(wangzhijiang999) 
> <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hi
> Generally speaking this would be a nice optimisation, however it might be 
> tricky to implement. The thing to keep in mind is that currently interface 
> allow to interleave broadcasting and normal sending, because of that at any 
> given time some serialisers can have more data then others. For example when 
> we have two output channels and we are looping following writes:
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> Write sth to 1. Channel
> Broadcast to all channels
> (…)
> Thus buffers of different channels can fill out with different rates.
>> In theory every record can be serialized only once and referenced for all 
>> the subpartitions in broadcast mode.
> The problem here is that after records serialising, the only unit that can be 
> referenced afterwards is “Buffer”. So that would leave us now with couple of 
> options:
> 1. Create a specialised BroadcastRecordWriter that supports ONLY 
> broadcasting, guaranteeing that all channels always receive the same data. 
> Here you could serialise records only once, to one BufferBuilder that could 
> be shared and referenced by multiple BufferConsumers from different channels. 
> Any non broadcast write would have to fail.
> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. 
> for broadcasts, but for any non broadcast write: finish current broadcasting 
> BufferBuilder, flush all data on all channels, serialise single record to 
> single channel using newly create BufferBuilder and also immediately 
> finish/flush it, so that any subsequent broadcasts will work again as in 1.:
> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
> modes. It would have two modes of operating that could be switched back and 
> forth: the same as currently implemented for non-broadcasted and optimised 
> broadcast mode
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Broadcast to all channels
> Write sth to X Channel // this flushes all channels and clears/finishes 
> previous BufferBuilder 
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to Y Channel
> Write sth to X Channel 
> Broadcast to all channels // this flushes all channels and clears/finishes 
> previous BufferBuilders, 
> Broadcast to all channels
> Broadcast to all channels
> (…)
> However both in 2. and 3. there would be very big penalty for mixing 
> broadcast with normal writes.  
> Piotrek
>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
>> <> wrote:
>> Hi all,
>> In current implementation, the RecordSerializer is created separately for 
>> each subpartition in RecordWriter, that means the number of serializers 
>> equals to the number of subpartitions.
>> For broadcast partitioner, every record will be serialized many times in all 
>> the subpartitions, and this may bring bad performance to some extent.
>> In theory every record can be serialized only once and referenced for all 
>> the subpartitions in broadcast mode.
>> To do so, I propose the following changes:
>> 1. Create and maintain only one serializer in RecordWriter, and it will 
>> serialize the record for all the subpartitions. It makes sense for any 
>> partitioners, and the memory overhead can be also decreased, because every 
>> serializer will maintain some separate byte buffers internally.
>> 2. Maybe we can abstract the RecordWriter as a base class used for other 
>> partitioner mode and implement a BroadcastRecordWriter for 
>> BroadcastPartitioner. And this new implementation will add buffer references 
>> based on the number of subpartitions before adding into subpartition queue.
>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
>> RecordWriter, then the uniform RecordWriter can be used for both stream and 
>> batch. The above BroadcastRecordWriter can aslo uniform for both stream and 
>> batch.
>> I am not sure whether this improvement is proposed before and what do you 
>> think of it?
>> If necessary I can create JIRAs to contirbute it, and may need one commiter 
>> cooperate with me.
>> Best,
>> Zhijiang

Reply via email to