Sounds good to me :)


> On 19 Oct 2018, at 08:34, Zhijiang(wangzhijiang999) 
> <> wrote:
> I agree with the additional thoughts of a), b) and c).
> In all the current implementations of ChannelSelector, the selector channels 
> are either one or all, so it makes sense for change the interface as you 
> suggested if we will not extend other selectors for partial channels in 
> future. And the single channel implementation would reduce some overheads in 
> arrays and loop. For broadcast selector, it is no need to retrun channels 
> from selector and we can make a shortcut process for this special 
> implementation.
> Comparing 3 vs 5, I still prefer 3 currently which can reuse the current 
> network process. We only create one BufferBuilder for al thel channels and 
> build separate BufferConsumer for every channel sharing the same 
> BufferBuilder. To do so, we just need a few changes on RecordWriter side, do 
> not touch the following components in network stack. And it will already gain 
> most of the performance benefits by doing so, which copies serialization 
> temporary buffer only once to one BufferBuilder.
> I can first create the JIRA for single channel interface if you have not done 
> that before, and then continue with copying step by step. :)
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年10月18日(星期四) 17:47
> 收件人:Zhijiang(wangzhijiang999) <>
> 抄 送:Nico Kruber <>; dev <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hey,
> I also think that 3rd option is the most promising, however logic of “dirty” 
> channels might be causing some overheads. I was also thinking about other 
> option:
> 5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to 
> common/shared BufferBuilder, but somehow marked it as targeted to only one 
> channel - we would send it over the network to all of the receivers, but all 
> except of one would ignore it. This might be easier to implement in 
> BroadcastRecordWriter, but would require extra logic on the receiver side. 
> With respect to the performance it also might be better compared to 3.
> Couple of more thoughts:
> a) if we select BroadcastRecordWriter, literally the only way how it can be 
> polluted by non broadcast writes are latency markers via `randomEmit`. When 
> choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so 
> we shouldn’t optimise for it, but pick something that’s easiest to implement.
> b) there are no use cases where `ChannelSelector` returns anything else 
> besides single channel or broadcast.
> b) point brings me to one more thing. I was once playing with simplifying 
> `ChannelSelector` interface by adding new one `SingleChannelSelector` with 
> method:
> `int selectChannel(T record, int numChannels);`
> And it was resulting with ~10% performance speed up for network stack alone 
> (overhead of creating singleton arrays and iterating over them). I didn’t 
> follow up on this, because performance gain wasn’t super huge, while it 
> complicated `RecordWriter`, since it had to handle both either 
> `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there 
> are no use cases for selecting more then one, but not all of the channels and 
> that anyway we go with broadcasting, we will have to special handle 
> `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the 
> implementation and drop this multi channel ChannelSelector.
> I think we should to this as a first step in a preparation before either 3. 
> or 5. (changing ChannelSelector signature to:
> int selectChannel(T record, int numChannels);
> )
> What do you think?
> Piotrek
> On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) 
> <> wrote:
> Hi Piotr,
> Thanks for your replies and suggestions!
> For my rough idea of skip index list, I agree with your concerns of 
> performance for non-broadcast case and complicated implementation. Although I 
> think this idea seems more unified in semantics for "emit", "broadcastEmit" 
> and "randomEmit" APIs, maybe it is not worth going deep into it currently for 
> global changes.
> Currently RecordWriter provides three main methods to write elements in 
> different semantics:
> "broadcastEmit" would write the element to all the channels, used for 
> watermark currently.
> "randomEmit" would write the element to one random channel, used for latency 
> marker currently.
> "emit" would write the element to some channels via ChannelSelector, used for 
> normal records currectly. And the selected channels may be one, some or all.
> If we want to retain these APIs for different requirements, then the 
> RecordWriter should not be aware of which kind of elements would be written 
> via APIs, so we should not make any assumings in the implementation. In 
> details, I know the "randomEmit" in only used for latency marker currently, 
> but we can not confirm whether this API would be used for other elements in 
> future, so we can not estimate how frequency is used for this API for 
> different possiable elements which is my above concerns. I do not want to 
> limit any future possibilities for these APIs caused by this improvement.
> Considering the below suggestions:
> 1.  Inserting the elements via "randomEmit" in front of unfinished broadcast 
> buffer will change the current sequence semantic. It may be not matter for 
> latency marker currently, but may not be extented for future other elements.
> 2. If we easily implement "randomEmit" as the way of broadcast, I am 
> wondering the broadcast storm in special cases and we also change the 
> semantics to send the unnecessary elements for some channels.
> 3.  I prefer this way currently and it is similar with our previous 
> discussion. And the implementation is more likely the way of current 
> "broadcastEvent", which creates a new broadcast buffer for event, and finish 
> the current buffer for all the channels before enqueuing this event buffer.
> 4. Yes, your sayings is write for current mode. And I want to pass a boolean 
> parameter "isBroadcast" in the constructor of RecordWriter for indicating 
> broadcast writes in specific processes, because the RecordWriter can not 
> check ChannelSelector instance based on module dependency.
> In conclusion, I want to implement this improvement based on the third point 
> from current thoughting, which keeps the same behavior like normal "emit" 
> mixing with "broadcastEvent".
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年10月17日(星期三) 19:25
> 收件人:Zhijiang(wangzhijiang999) <>
> 抄 送:Nico Kruber <>; dev <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hi,
> Regarding the second idea with skip index list, I would guess it might have 
> bad performance impact in non broadcasting cases or would seriously 
> complicate our Buffer implementation. Also it would make reading/data 
> copying/slicing and other big chunk byte operations much more costly. Instead 
> of memcpy whole buffer we would have to manually select the correct bits.
>> But I am just wondering if the switch is frequent between broadcasting and 
>> non-broadcasting operations
> I haven't researched this topic any further then before. However my first 
> guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` 
> which is used for the latency markers (this statement requires further 
> research/validation). Assuming that’s true.
> 1. Probably we can not/should not flush the broadcasted buffer, serialise 
> randomEmit and flush it again, because this would prematurely emit latency 
> marker - defeating it purpose and skewing the measured time. LatencyMarkers 
> are expected to travel through pipeline at the exact same speed as regular 
> records would.
> 2. Maybe we could just always broadcast the latency markers as well? This 
> would be nice solution except of that at the level of RecordWriter we do not 
> know whether this is latency marker or not - we no only that we were asked to 
> “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow 
> (throwing exception?)
> 3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted 
> `BufferBuilder` into a new one, append there the record/latency marker so all 
> except of one channel would share “broadcasted” BufferBuilder. Once we need 
> to flush any of the buffers (either broadcasted or the “dirty” one) we flush 
> them all and restart with all channels sharing a fresh new “broadcasted” 
> BufferBuilder? 
> 4. For streaming isn't Broadcast currently realised via passing 
> `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? 
> That would need to be changed/handled in order to optimise broadcast writes.
> Piotrek
> On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) 
> <> wrote:
> Let's come back to this discussion again.
> Thanks for @Nico Kruber and @Piotr Nowojski  reviewing the PR of proposed 2.1 
> for serialization only once below and it is already merged into branch.
> For proposed 2.2 for copy only once, we verified it is also very important 
> for batch job related with join operation in our benchmark, so we want to 
> focus on improving it based on 2.1.
> Considering the initial conclusion before, we want to finish current 
> broadcasting shared BufferBuilder when triggering non-broadcast operation 
> which will request new separate BufferBuilder. But I am just wondering if the 
> switch is frequent between broadcasting and non-broadcasting operations, the 
> BufferBuilder may be filled with few data resulting in low resource 
> utilization which may cause regression in special cases. And as long as one 
> channel has not consumed the data by network transfer, this shared 
> BufferBuilder can not be recycled.
> Another raw idea is if we support one BufferBuilder shared by all the 
> channels has different data regions for differnet partition indexes, that 
> means one subpartition has non-continuous data distribution in the 
> BufferBuilder. We can use skip index list to identify which regions belong to 
> sepecific subpartiiton which can avoid finishing BufferBuilder during mode 
> switch. But it seems more complicated with current process and i am not sure 
> whether it has other performance concerns.
> Wish any feedbacks for this issue, then i can further focus on it. 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Zhijiang(wangzhijiang999) <>
> 发送时间:2018年7月20日(星期五) 13:21
> 收件人:Piotr Nowojski <>
> 抄 送:Nico Kruber <>; dev <>
> 主 题:回复:[DISCUSS] Improve broadcast serialization
> Ok, that is fine. :)
> I will create JIRA today and submit the PR next week.
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年7月19日(星期四) 17:52
> 收件人:Zhijiang(wangzhijiang999) <>
> 抄 送:Nico Kruber <>; dev <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hi,
> I have only noticed your second response after sending my email :) 
> Ok, now I think we are on the same page :) I think you can work on 2.1 and 
> later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira 
> issues/PRs please CC me.
> Piotrek  
> On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) 
> <> wrote:
> Hi Piotr
> 1. I agree with we should discuss higher level first and focus on 
> implementation on jira/pr. As long as RecordSerializer does not maintain the 
> BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the 
> RecordWriter at any time.  And I think it is the precondition to improve 
> serializing only once for multi channels, otherwise we have to select 
> serializer based on target channel index.
> 2. I already corrected this thought in last reply, maybe you have not seen it 
> before you reply. :)  
> We can break the broadcast improvement into two steps:
> 2.1 Serialize the record into temporary byte buffer only once for multi 
> selected channels. (currently serialize many times)
> 2.2 Copy the temporary byte buffer into BufferBuilder only once and create 
> different BufferConsumers based on the same BufferBuilder for each channel. 
> (currently copy many times)
> Regarding 2.1, just the same as your proposal[c], it is worth to do currently 
> and can get good benefits I think.
> Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to 
> flush/finish last broadcast BufferBuilder for current non-broadcast writes 
> and vice versa. I agree with your proposal[2] for this issue, and we can 
> further consider it in future, maybe there are other better ways for avoiding 
> it.
> 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your 
> proposal[c] which has no problem for mixed write mode, so no need additional 
> flush. The 2.2 is just as your proposal[2] which concerns additional flush. 
> Maybe my last reply make you misunderstand.
> I can submit jira for above 2.1 first if no other concerns, thanks for the 
> helpful advice. :)
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年7月18日(星期三) 20:04
> 收件人:Zhijiang(wangzhijiang999) <>; Nico Kruber 
> <>
> 抄 送:dev <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hi 
> 1. I want to define a new AbstractRecordWriter as base class which defines 
> some abstract methods and utility codes. The current RecordWriter used for 
> other partitioner and new BroadcastRecordWriter used only for 
> BroadcastPartitioner will both extend AbstractRecordWriter. The fields in 
> BroadcastPartitioner are extactly as you showed below, but for current 
> RecordWriter it also only needs one RecordSerializer if we make the 
> RecordSerializer has no internal state.
> Lets first discuss what we would like to have/implement on higher level and 
> later focus on implementation details. Regarding making RecordSerializer 
> stateless, there were some discussions about it previously and it was on our 
> TODO list but I don’t remember what was holding us back. Maybe Nico will 
> remember?
> 2. You pointed the key problem that how to handle `randomEmit` in 
> BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
> current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
> BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
> only once. So this improvement is deterministic for BroadcastPartitioner.
> What logic to reuse do you have in mind? 
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
> partitioner, we can also do as you suggested in option [2], but it has to 
> finish/flush the previous BufferBuilder generated by common `emit` operation. 
> So it may bring bad impacts on buffer utility which was improved well in 
> event-driven flush feature. So I am not sure whether it is worth doing 
> `broadcastEmit` improvement in RecordWriter.
> The whole point of my proposal [c] was to avoid the need to flush. Code would 
> need a little bit more refactoring but it should look something like this:
> void broadcastEmit(record):
> serializedRecord = serializer.serialize(record)
> for bufferBuilder in bufferBuilders:
> bufferBuilder.append(serializedRecord)
> // if we overfilled bufferBuilder, finish it, request new one and continue 
> writing
> void emit(record, channel)
> serializedRecord = serializer.serialize(record)
> bufferBuilders[channel].append(serializedRecord)
> // if we overfilled bufferBuilder, finish it, request new one and continue 
> writing
> I do not see here a need for additional flushes and it should be strict 
> improvement over current code base.
> I already realized the demo covering above 1,2,5 before. I can create jiras 
> after we reach a final agreement, then maybe you can help review PR if have 
> time. :)
> Sure :)
> Piotrek
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <>; Zhijiang(wangzhijiang999) 
> <>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> Hi,
> Couple of more thoughts
> a) I’m not sure if you would have to modify current RecordWriter at all. You 
> could extract interface from current RecordWriter and just provide two 
> implementations: current one and BroadcastRecordWriter. I’m not sure, but it 
> doesn’t seem like they would duplicate/share lots of code. 
> BroadcastRecordWriter would have fields:
> private final RecordSerializer<T> serializers;
> private final Optional<BufferBuilder> bufferBuilder;
> Compared to RecordWriter’s arrays.
> b) One thing that I noticed now are latency markers and randomEmit method. It 
> prevents us from implementing option [1]. BroadcastRecordWriter would have to 
> flush all channels on randomEmit (as I proposed in option [2]).
> c) Another option to optimise broadcast writes (or for that matter all multi 
> channel writes), would be to serialise record only once to 
> SpanningRecordSerializer#serializationBuffer, but copy it multiple times to 
> separate BufferBuilders. That would save us much more then half of the 
> overhead (serialisation is more costly compared to data copying), while we 
> would avoid problems with uneven state of channels. There would be no 
> problems with mixed broadcast/non broadcast writes, this option could support 
> both of them at the same time - in other words, it would be as generic as the 
> current one.
> d) Regarding StreamRecordWriter, other option is, that it could be refactored 
> to a class implementing extracted RecordWriter interface and being a 
> proxy/wrapper around another RecordWriter instance:
> Class StreamRecordWriter implements RecordWriter {
>  private final RecordWrtier recordWriter; //either broadcast or non broadcast 
>  public void foo() {
>  }
> }
> To be honest I’m not sure at the moment which one would be better [2] or [c]. 
> In ideal world, we might want to replace current RecordWriter with [c] and 
> after that (if that’s not enough) to implement [2] on top of [c]. 
> Piotrek
>> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) 
>> <> wrote:
>> Hi Piotr,
>> Thanks for your replies and professional suggestions!
>> My initial thought is just as you said in first suggestion. The current 
>> RecordWriter will emit StreamRecord to some subpartition via ChannelSelector 
>> or broadcast events/watermark to all subpartitions directly.
>> If the ChannelSelector implementation is BroadcastPartitioner, then we can 
>> create a specialized BroadcastRecordWriter to handle the 'emit', 
>> 'broadcastEmit', 'broadcastEvent', etc.
>> To make it seems not tricky, I want to abstract the RecordWriter as a 
>> plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter 
>> separately to extend abstract RecordWriter. That means we divide the 
>> RecordWriter by ChannelSelector, and also we may remove current 
>> StreamRecordWriter to uniform the RecordWriter criteria in both stream and 
>> batch mode.
>> Considering specific implementations, I think one RecordSerializer can work 
>> for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the 
>> precondition is making the RecordSerializer has no internal state, so we 
>> have to remove the BufferBuilder variable from SpanningRecordSerializer and 
>> pass it via addRecord/continueWritingWithNextBufferBuilder
>> methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
>> BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
>> maintain one BufferBuilder per subpartition.
>> Another issue is whether this improvement is suitable for 
>> broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in 
>> suggestion 2,3. I wonder it may decrease the buffer utilization if switch 
>> between broadcast and non-broadcast modes, even it may seem more tricky in 
>> implementation. I am still thinking of it.
>> Maybe we can implement the improvement for BroadcastPartitioner in first 
>> step and make sure one RecordSerializer for all subpartitions. That can 
>> reduce the memory overhead in RecordSerializer and the time cost in 
>> broadcast serialization scenarios.
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> 发件人:Piotr Nowojski <>
>> 发送时间:2018年7月17日(星期二) 23:31
>> 收件人:dev <>; Zhijiang(wangzhijiang999) 
>> <>
>> 主 题:Re: [DISCUSS] Improve broadcast serialization
>> Hi
>> Generally speaking this would be a nice optimisation, however it might be 
>> tricky to implement. The thing to keep in mind is that currently interface 
>> allow to interleave broadcasting and normal sending, because of that at any 
>> given time some serialisers can have more data then others. For example when 
>> we have two output channels and we are looping following writes:
>> Write sth to 1. Channel
>> Broadcast to all channels
>> Write sth to 1. Channel
>> Broadcast to all channels
>> Write sth to 1. Channel
>> Broadcast to all channels
>> (…)
>> Thus buffers of different channels can fill out with different rates.
>>> In theory every record can be serialized only once and referenced for all 
>>> the subpartitions in broadcast mode.
>> The problem here is that after records serialising, the only unit that can 
>> be referenced afterwards is “Buffer”. So that would leave us now with couple 
>> of options:
>> 1. Create a specialised BroadcastRecordWriter that supports ONLY 
>> broadcasting, guaranteeing that all channels always receive the same data. 
>> Here you could serialise records only once, to one BufferBuilder that could 
>> be shared and referenced by multiple BufferConsumers from different 
>> channels. Any non broadcast write would have to fail.
>> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 
>> 1. for broadcasts, but for any non broadcast write: finish current 
>> broadcasting BufferBuilder, flush all data on all channels, serialise single 
>> record to single channel using newly create BufferBuilder and also 
>> immediately finish/flush it, so that any subsequent broadcasts will work 
>> again as in 1.:
>> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
>> modes. It would have two modes of operating that could be switched back and 
>> forth: the same as currently implemented for non-broadcasted and optimised 
>> broadcast mode
>> Broadcast to all channels
>> Broadcast to all channels
>> Broadcast to all channels
>> Broadcast to all channels
>> Write sth to X Channel // this flushes all channels and clears/finishes 
>> previous BufferBuilder 
>> Write sth to Y Channel
>> Write sth to Y Channel
>> Write sth to Y Channel
>> Write sth to X Channel 
>> Broadcast to all channels // this flushes all channels and clears/finishes 
>> previous BufferBuilders, 
>> Broadcast to all channels
>> Broadcast to all channels
>> (…)
>> However both in 2. and 3. there would be very big penalty for mixing 
>> broadcast with normal writes.  
>> Piotrek
>>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
>>> <> wrote:
>>> Hi all,
>>> In current implementation, the RecordSerializer is created separately for 
>>> each subpartition in RecordWriter, that means the number of serializers 
>>> equals to the number of subpartitions.
>>> For broadcast partitioner, every record will be serialized many times in 
>>> all the subpartitions, and this may bring bad performance to some extent.
>>> In theory every record can be serialized only once and referenced for all 
>>> the subpartitions in broadcast mode.
>>> To do so, I propose the following changes:
>>> 1. Create and maintain only one serializer in RecordWriter, and it will 
>>> serialize the record for all the subpartitions. It makes sense for any 
>>> partitioners, and the memory overhead can be also decreased, because every 
>>> serializer will maintain some separate byte buffers internally.
>>> 2. Maybe we can abstract the RecordWriter as a base class used for other 
>>> partitioner mode and implement a BroadcastRecordWriter for 
>>> BroadcastPartitioner. And this new implementation will add buffer 
>>> references based on the number of subpartitions before adding into 
>>> subpartition queue.
>>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
>>> RecordWriter, then the uniform RecordWriter can be used for both stream and 
>>> batch. The above BroadcastRecordWriter can aslo uniform for both stream and 
>>> batch.
>>> I am not sure whether this improvement is proposed before and what do you 
>>> think of it?
>>> If necessary I can create JIRAs to contirbute it, and may need one commiter 
>>> cooperate with me.
>>> Best,
>>> Zhijiang

Reply via email to