Sounds good to me :) Piotrek
> On 19 Oct 2018, at 08:34, Zhijiang(wangzhijiang999) > <[email protected]> wrote: > > I agree with the additional thoughts of a), b) and c). > > In all the current implementations of ChannelSelector, the selector channels > are either one or all, so it makes sense for change the interface as you > suggested if we will not extend other selectors for partial channels in > future. And the single channel implementation would reduce some overheads in > arrays and loop. For broadcast selector, it is no need to retrun channels > from selector and we can make a shortcut process for this special > implementation. > > Comparing 3 vs 5, I still prefer 3 currently which can reuse the current > network process. We only create one BufferBuilder for al thel channels and > build separate BufferConsumer for every channel sharing the same > BufferBuilder. To do so, we just need a few changes on RecordWriter side, do > not touch the following components in network stack. And it will already gain > most of the performance benefits by doing so, which copies serialization > temporary buffer only once to one BufferBuilder. > > I can first create the JIRA for single channel interface if you have not done > that before, and then continue with copying step by step. :) > > Best, > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <[email protected]> > 发送时间:2018年10月18日(星期四) 17:47 > 收件人:Zhijiang(wangzhijiang999) <[email protected]> > 抄 送:Nico Kruber <[email protected]>; dev <[email protected]> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hey, > > I also think that 3rd option is the most promising, however logic of “dirty” > channels might be causing some overheads. I was also thinking about other > option: > > 5. In case of ‘emit’ called on BroadcastRecordWriter, we could write it to > common/shared BufferBuilder, but somehow marked it as targeted to only one > channel - we would send it over the network to all of the receivers, but all > except of one would ignore it. This might be easier to implement in > BroadcastRecordWriter, but would require extra logic on the receiver side. > With respect to the performance it also might be better compared to 3. > > Couple of more thoughts: > > a) if we select BroadcastRecordWriter, literally the only way how it can be > polluted by non broadcast writes are latency markers via `randomEmit`. When > choosing 3 vs 5, mixing broadcast and non broadcast happens very rarely, so > we shouldn’t optimise for it, but pick something that’s easiest to implement. > b) there are no use cases where `ChannelSelector` returns anything else > besides single channel or broadcast. > > b) point brings me to one more thing. I was once playing with simplifying > `ChannelSelector` interface by adding new one `SingleChannelSelector` with > method: > > `int selectChannel(T record, int numChannels);` > > And it was resulting with ~10% performance speed up for network stack alone > (overhead of creating singleton arrays and iterating over them). I didn’t > follow up on this, because performance gain wasn’t super huge, while it > complicated `RecordWriter`, since it had to handle both either > `SingleChannelSelector` or `ChannelSelector`. Now that I realised that there > are no use cases for selecting more then one, but not all of the channels and > that anyway we go with broadcasting, we will have to special handle > `BroadcastPartitioner`, that’s the perfect occasion to actually simplify the > implementation and drop this multi channel ChannelSelector. > > I think we should to this as a first step in a preparation before either 3. > or 5. (changing ChannelSelector signature to: > > int selectChannel(T record, int numChannels); > > ) > > What do you think? > > Piotrek > > On 18 Oct 2018, at 06:12, Zhijiang(wangzhijiang999) > <[email protected]> wrote: > Hi Piotr, > > Thanks for your replies and suggestions! > > For my rough idea of skip index list, I agree with your concerns of > performance for non-broadcast case and complicated implementation. Although I > think this idea seems more unified in semantics for "emit", "broadcastEmit" > and "randomEmit" APIs, maybe it is not worth going deep into it currently for > global changes. > > Currently RecordWriter provides three main methods to write elements in > different semantics: > > "broadcastEmit" would write the element to all the channels, used for > watermark currently. > "randomEmit" would write the element to one random channel, used for latency > marker currently. > "emit" would write the element to some channels via ChannelSelector, used for > normal records currectly. And the selected channels may be one, some or all. > > If we want to retain these APIs for different requirements, then the > RecordWriter should not be aware of which kind of elements would be written > via APIs, so we should not make any assumings in the implementation. In > details, I know the "randomEmit" in only used for latency marker currently, > but we can not confirm whether this API would be used for other elements in > future, so we can not estimate how frequency is used for this API for > different possiable elements which is my above concerns. I do not want to > limit any future possibilities for these APIs caused by this improvement. > > Considering the below suggestions: > > 1. Inserting the elements via "randomEmit" in front of unfinished broadcast > buffer will change the current sequence semantic. It may be not matter for > latency marker currently, but may not be extented for future other elements. > > 2. If we easily implement "randomEmit" as the way of broadcast, I am > wondering the broadcast storm in special cases and we also change the > semantics to send the unnecessary elements for some channels. > > 3. I prefer this way currently and it is similar with our previous > discussion. And the implementation is more likely the way of current > "broadcastEvent", which creates a new broadcast buffer for event, and finish > the current buffer for all the channels before enqueuing this event buffer. > > 4. Yes, your sayings is write for current mode. And I want to pass a boolean > parameter "isBroadcast" in the constructor of RecordWriter for indicating > broadcast writes in specific processes, because the RecordWriter can not > check ChannelSelector instance based on module dependency. > > In conclusion, I want to implement this improvement based on the third point > from current thoughting, which keeps the same behavior like normal "emit" > mixing with "broadcastEvent". > > Best, > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <[email protected]> > 发送时间:2018年10月17日(星期三) 19:25 > 收件人:Zhijiang(wangzhijiang999) <[email protected]> > 抄 送:Nico Kruber <[email protected]>; dev <[email protected]> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hi, > > Regarding the second idea with skip index list, I would guess it might have > bad performance impact in non broadcasting cases or would seriously > complicate our Buffer implementation. Also it would make reading/data > copying/slicing and other big chunk byte operations much more costly. Instead > of memcpy whole buffer we would have to manually select the correct bits. > >> But I am just wondering if the switch is frequent between broadcasting and >> non-broadcasting operations > > I haven't researched this topic any further then before. However my first > guess would be that this switch doesn’t happen at all EXCEPT of `randomEmit` > which is used for the latency markers (this statement requires further > research/validation). Assuming that’s true. > > 1. Probably we can not/should not flush the broadcasted buffer, serialise > randomEmit and flush it again, because this would prematurely emit latency > marker - defeating it purpose and skewing the measured time. LatencyMarkers > are expected to travel through pipeline at the exact same speed as regular > records would. > > 2. Maybe we could just always broadcast the latency markers as well? This > would be nice solution except of that at the level of RecordWriter we do not > know whether this is latency marker or not - we no only that we were asked to > “emit”, “randomEmit” or “broadcastEmit” and we have to handle them somehow > (throwing exception?) > > 3. Assuming `randomEmit` or `emit` is rare, maybe we copy the broadcasted > `BufferBuilder` into a new one, append there the record/latency marker so all > except of one channel would share “broadcasted” BufferBuilder. Once we need > to flush any of the buffers (either broadcasted or the “dirty” one) we flush > them all and restart with all channels sharing a fresh new “broadcasted” > BufferBuilder? > > 4. For streaming isn't Broadcast currently realised via passing > `BroadcastPartitioner` to the RecordWriter and using standard `emit` method? > That would need to be changed/handled in order to optimise broadcast writes. > > Piotrek > > On 15 Oct 2018, at 12:02, Zhijiang(wangzhijiang999) > <[email protected]> wrote: > Let's come back to this discussion again. > > Thanks for @Nico Kruber and @Piotr Nowojski reviewing the PR of proposed 2.1 > for serialization only once below and it is already merged into branch. > > For proposed 2.2 for copy only once, we verified it is also very important > for batch job related with join operation in our benchmark, so we want to > focus on improving it based on 2.1. > Considering the initial conclusion before, we want to finish current > broadcasting shared BufferBuilder when triggering non-broadcast operation > which will request new separate BufferBuilder. But I am just wondering if the > switch is frequent between broadcasting and non-broadcasting operations, the > BufferBuilder may be filled with few data resulting in low resource > utilization which may cause regression in special cases. And as long as one > channel has not consumed the data by network transfer, this shared > BufferBuilder can not be recycled. > > Another raw idea is if we support one BufferBuilder shared by all the > channels has different data regions for differnet partition indexes, that > means one subpartition has non-continuous data distribution in the > BufferBuilder. We can use skip index list to identify which regions belong to > sepecific subpartiiton which can avoid finishing BufferBuilder during mode > switch. But it seems more complicated with current process and i am not sure > whether it has other performance concerns. > > Wish any feedbacks for this issue, then i can further focus on it. > > Best, > Zhijiang > > ------------------------------------------------------------------ > 发件人:Zhijiang(wangzhijiang999) <[email protected]> > 发送时间:2018年7月20日(星期五) 13:21 > 收件人:Piotr Nowojski <[email protected]> > 抄 送:Nico Kruber <[email protected]>; dev <[email protected]> > 主 题:回复:[DISCUSS] Improve broadcast serialization > > Ok, that is fine. :) > > I will create JIRA today and submit the PR next week. > > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <[email protected]> > 发送时间:2018年7月19日(星期四) 17:52 > 收件人:Zhijiang(wangzhijiang999) <[email protected]> > 抄 送:Nico Kruber <[email protected]>; dev <[email protected]> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hi, > > I have only noticed your second response after sending my email :) > > Ok, now I think we are on the same page :) I think you can work on 2.1 and > later on 2.2 if you will think that 2.1 is not enough. Once you create a Jira > issues/PRs please CC me. > > Piotrek > > On 19 Jul 2018, at 04:51, Zhijiang(wangzhijiang999) > <[email protected]> wrote: > Hi Piotr > > 1. I agree with we should discuss higher level first and focus on > implementation on jira/pr. As long as RecordSerializer does not maintain the > BufferBuilder, it can become stateless, then it can get BufferBuilderfrom the > RecordWriter at any time. And I think it is the precondition to improve > serializing only once for multi channels, otherwise we have to select > serializer based on target channel index. > > 2. I already corrected this thought in last reply, maybe you have not seen it > before you reply. :) > We can break the broadcast improvement into two steps: > 2.1 Serialize the record into temporary byte buffer only once for multi > selected channels. (currently serialize many times) > 2.2 Copy the temporary byte buffer into BufferBuilder only once and create > different BufferConsumers based on the same BufferBuilder for each channel. > (currently copy many times) > Regarding 2.1, just the same as your proposal[c], it is worth to do currently > and can get good benefits I think. > Regarding 2.2, considering mixed broadcast/non-broadcast writes, it has to > flush/finish last broadcast BufferBuilder for current non-broadcast writes > and vice versa. I agree with your proposal[2] for this issue, and we can > further consider it in future, maybe there are other better ways for avoiding > it. > > 4. My previous thought is to realize both above 2.1 and 2.2. The 2.1 is your > proposal[c] which has no problem for mixed write mode, so no need additional > flush. The 2.2 is just as your proposal[2] which concerns additional flush. > Maybe my last reply make you misunderstand. > > I can submit jira for above 2.1 first if no other concerns, thanks for the > helpful advice. :) > > Best, > > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <[email protected]> > 发送时间:2018年7月18日(星期三) 20:04 > 收件人:Zhijiang(wangzhijiang999) <[email protected]>; Nico Kruber > <[email protected]> > 抄 送:dev <[email protected]> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > > Hi > > 1. I want to define a new AbstractRecordWriter as base class which defines > some abstract methods and utility codes. The current RecordWriter used for > other partitioner and new BroadcastRecordWriter used only for > BroadcastPartitioner will both extend AbstractRecordWriter. The fields in > BroadcastPartitioner are extactly as you showed below, but for current > RecordWriter it also only needs one RecordSerializer if we make the > RecordSerializer has no internal state. > > Lets first discuss what we would like to have/implement on higher level and > later focus on implementation details. Regarding making RecordSerializer > stateless, there were some discussions about it previously and it was on our > TODO list but I don’t remember what was holding us back. Maybe Nico will > remember? > > > 2. You pointed the key problem that how to handle `randomEmit` in > BroadcastRecordWriter, and I think this process may resue the `emit` logic in > current RecordWriter. Then the `emit` and `broadcastEmit` logics in > BroadcastRecordWriter will serialize data only once and copy to BufferBuilder > only once. So this improvement is deterministic for BroadcastPartitioner. > > > What logic to reuse do you have in mind? > 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast > partitioner, we can also do as you suggested in option [2], but it has to > finish/flush the previous BufferBuilder generated by common `emit` operation. > So it may bring bad impacts on buffer utility which was improved well in > event-driven flush feature. So I am not sure whether it is worth doing > `broadcastEmit` improvement in RecordWriter. > > > The whole point of my proposal [c] was to avoid the need to flush. Code would > need a little bit more refactoring but it should look something like this: > > void broadcastEmit(record): > serializedRecord = serializer.serialize(record) > for bufferBuilder in bufferBuilders: > bufferBuilder.append(serializedRecord) > // if we overfilled bufferBuilder, finish it, request new one and continue > writing > > void emit(record, channel) > serializedRecord = serializer.serialize(record) > bufferBuilders[channel].append(serializedRecord) > // if we overfilled bufferBuilder, finish it, request new one and continue > writing > > I do not see here a need for additional flushes and it should be strict > improvement over current code base. > > > I already realized the demo covering above 1,2,5 before. I can create jiras > after we reach a final agreement, then maybe you can help review PR if have > time. :) > > > Sure :) > > Piotrek > > Best, > > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <[email protected]> > 发送时间:2018年7月18日(星期三) 16:37 > 收件人:dev <[email protected]>; Zhijiang(wangzhijiang999) > <[email protected]> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hi, > > Couple of more thoughts > > a) I’m not sure if you would have to modify current RecordWriter at all. You > could extract interface from current RecordWriter and just provide two > implementations: current one and BroadcastRecordWriter. I’m not sure, but it > doesn’t seem like they would duplicate/share lots of code. > BroadcastRecordWriter would have fields: > > private final RecordSerializer<T> serializers; > > private final Optional<BufferBuilder> bufferBuilder; > > Compared to RecordWriter’s arrays. > > b) One thing that I noticed now are latency markers and randomEmit method. It > prevents us from implementing option [1]. BroadcastRecordWriter would have to > flush all channels on randomEmit (as I proposed in option [2]). > > c) Another option to optimise broadcast writes (or for that matter all multi > channel writes), would be to serialise record only once to > SpanningRecordSerializer#serializationBuffer, but copy it multiple times to > separate BufferBuilders. That would save us much more then half of the > overhead (serialisation is more costly compared to data copying), while we > would avoid problems with uneven state of channels. There would be no > problems with mixed broadcast/non broadcast writes, this option could support > both of them at the same time - in other words, it would be as generic as the > current one. > > d) Regarding StreamRecordWriter, other option is, that it could be refactored > to a class implementing extracted RecordWriter interface and being a > proxy/wrapper around another RecordWriter instance: > > Class StreamRecordWriter implements RecordWriter { > private final RecordWrtier recordWriter; //either broadcast or non broadcast > public void foo() { > recordWriter.foo(); > } > } > > To be honest I’m not sure at the moment which one would be better [2] or [c]. > In ideal world, we might want to replace current RecordWriter with [c] and > after that (if that’s not enough) to implement [2] on top of [c]. > > Piotrek > >> On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) >> <[email protected]> wrote: >> >> Hi Piotr, >> >> Thanks for your replies and professional suggestions! >> >> My initial thought is just as you said in first suggestion. The current >> RecordWriter will emit StreamRecord to some subpartition via ChannelSelector >> or broadcast events/watermark to all subpartitions directly. >> If the ChannelSelector implementation is BroadcastPartitioner, then we can >> create a specialized BroadcastRecordWriter to handle the 'emit', >> 'broadcastEmit', 'broadcastEvent', etc. >> To make it seems not tricky, I want to abstract the RecordWriter as a >> plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter >> separately to extend abstract RecordWriter. That means we divide the >> RecordWriter by ChannelSelector, and also we may remove current >> StreamRecordWriter to uniform the RecordWriter criteria in both stream and >> batch mode. >> >> Considering specific implementations, I think one RecordSerializer can work >> for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the >> precondition is making the RecordSerializer has no internal state, so we >> have to remove the BufferBuilder variable from SpanningRecordSerializer and >> pass it via addRecord/continueWritingWithNextBufferBuilder >> methods from RecordWriter. BroadcastRecordWriter only needs maintain one >> BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need >> maintain one BufferBuilder per subpartition. >> >> Another issue is whether this improvement is suitable for >> broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in >> suggestion 2,3. I wonder it may decrease the buffer utilization if switch >> between broadcast and non-broadcast modes, even it may seem more tricky in >> implementation. I am still thinking of it. >> >> Maybe we can implement the improvement for BroadcastPartitioner in first >> step and make sure one RecordSerializer for all subpartitions. That can >> reduce the memory overhead in RecordSerializer and the time cost in >> broadcast serialization scenarios. >> >> Best, >> >> Zhijiang >> >> >> ------------------------------------------------------------------ >> 发件人:Piotr Nowojski <[email protected]> >> 发送时间:2018年7月17日(星期二) 23:31 >> 收件人:dev <[email protected]>; Zhijiang(wangzhijiang999) >> <[email protected]> >> 主 题:Re: [DISCUSS] Improve broadcast serialization >> >> Hi >> >> Generally speaking this would be a nice optimisation, however it might be >> tricky to implement. The thing to keep in mind is that currently interface >> allow to interleave broadcasting and normal sending, because of that at any >> given time some serialisers can have more data then others. For example when >> we have two output channels and we are looping following writes: >> >> Write sth to 1. Channel >> Broadcast to all channels >> Write sth to 1. Channel >> Broadcast to all channels >> Write sth to 1. Channel >> Broadcast to all channels >> (…) >> >> Thus buffers of different channels can fill out with different rates. >> >>> In theory every record can be serialized only once and referenced for all >>> the subpartitions in broadcast mode. >> >> The problem here is that after records serialising, the only unit that can >> be referenced afterwards is “Buffer”. So that would leave us now with couple >> of options: >> >> 1. Create a specialised BroadcastRecordWriter that supports ONLY >> broadcasting, guaranteeing that all channels always receive the same data. >> Here you could serialise records only once, to one BufferBuilder that could >> be shared and referenced by multiple BufferConsumers from different >> channels. Any non broadcast write would have to fail. >> >> 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in >> 1. for broadcasts, but for any non broadcast write: finish current >> broadcasting BufferBuilder, flush all data on all channels, serialise single >> record to single channel using newly create BufferBuilder and also >> immediately finish/flush it, so that any subsequent broadcasts will work >> again as in 1.: >> >> 3. Similar as 2, but lazily switch between broadcasting and non-broadcasting >> modes. It would have two modes of operating that could be switched back and >> forth: the same as currently implemented for non-broadcasted and optimised >> broadcast mode >> >> Broadcast to all channels >> Broadcast to all channels >> Broadcast to all channels >> Broadcast to all channels >> Write sth to X Channel // this flushes all channels and clears/finishes >> previous BufferBuilder >> Write sth to Y Channel >> Write sth to Y Channel >> Write sth to Y Channel >> Write sth to X Channel >> Broadcast to all channels // this flushes all channels and clears/finishes >> previous BufferBuilders, >> Broadcast to all channels >> Broadcast to all channels >> (…) >> >> However both in 2. and 3. there would be very big penalty for mixing >> broadcast with normal writes. >> >> Piotrek >> >>> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) >>> <[email protected]> wrote: >>> >>> Hi all, >>> >>> In current implementation, the RecordSerializer is created separately for >>> each subpartition in RecordWriter, that means the number of serializers >>> equals to the number of subpartitions. >>> For broadcast partitioner, every record will be serialized many times in >>> all the subpartitions, and this may bring bad performance to some extent. >>> In theory every record can be serialized only once and referenced for all >>> the subpartitions in broadcast mode. >>> >>> To do so, I propose the following changes: >>> 1. Create and maintain only one serializer in RecordWriter, and it will >>> serialize the record for all the subpartitions. It makes sense for any >>> partitioners, and the memory overhead can be also decreased, because every >>> serializer will maintain some separate byte buffers internally. >>> 2. Maybe we can abstract the RecordWriter as a base class used for other >>> partitioner mode and implement a BroadcastRecordWriter for >>> BroadcastPartitioner. And this new implementation will add buffer >>> references based on the number of subpartitions before adding into >>> subpartition queue. >>> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to >>> RecordWriter, then the uniform RecordWriter can be used for both stream and >>> batch. The above BroadcastRecordWriter can aslo uniform for both stream and >>> batch. >>> >>> I am not sure whether this improvement is proposed before and what do you >>> think of it? >>> If necessary I can create JIRAs to contirbute it, and may need one commiter >>> cooperate with me. >>> >>> Best, >>> >>> Zhijiang >> > > > > > > > > > >
