Hi > 1. I want to define a new AbstractRecordWriter as base class which defines > some abstract methods and utility codes. The current RecordWriter used for > other partitioner and new BroadcastRecordWriter used only for > BroadcastPartitioner will both extend AbstractRecordWriter. The fields in > BroadcastPartitioner are extactly as you showed below, but for current > RecordWriter it also only needs one RecordSerializer if we make the > RecordSerializer has no internal state.
Lets first discuss what we would like to have/implement on higher level and later focus on implementation details. Regarding making RecordSerializer stateless, there were some discussions about it previously and it was on our TODO list but I don’t remember what was holding us back. Maybe Nico will remember? > > 2. You pointed the key problem that how to handle `randomEmit` in > BroadcastRecordWriter, and I think this process may resue the `emit` logic in > current RecordWriter. Then the `emit` and `broadcastEmit` logics in > BroadcastRecordWriter will serialize data only once and copy to BufferBuilder > only once. So this improvement is deterministic for BroadcastPartitioner. > What logic to reuse do you have in mind? > > 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast > partitioner, we can also do as you suggested in option [2], but it has to > finish/flush the previous BufferBuilder generated by common `emit` operation. > So it may bring bad impacts on buffer utility which was improved well in > event-driven flush feature. So I am not sure whether it is worth doing > `broadcastEmit` improvement in RecordWriter. > The whole point of my proposal [c] was to avoid the need to flush. Code would need a little bit more refactoring but it should look something like this: void broadcastEmit(record): serializedRecord = serializer.serialize(record) for bufferBuilder in bufferBuilders: bufferBuilder.append(serializedRecord) // if we overfilled bufferBuilder, finish it, request new one and continue writing void emit(record, channel) serializedRecord = serializer.serialize(record) bufferBuilders[channel].append(serializedRecord) // if we overfilled bufferBuilder, finish it, request new one and continue writing I do not see here a need for additional flushes and it should be strict improvement over current code base. > > I already realized the demo covering above 1,2,5 before. I can create jiras > after we reach a final agreement, then maybe you can help review PR if have > time. :) > Sure :) Piotrek > > Best, > > Zhijiang > ------------------------------------------------------------------ > 发件人:Piotr Nowojski <pi...@data-artisans.com> > 发送时间:2018年7月18日(星期三) 16:37 > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) > <wangzhijiang...@aliyun.com> > 主 题:Re: [DISCUSS] Improve broadcast serialization > > Hi, > > Couple of more thoughts > > a) I’m not sure if you would have to modify current RecordWriter at all. You > could extract interface from current RecordWriter and just provide two > implementations: current one and BroadcastRecordWriter. I’m not sure, but it > doesn’t seem like they would duplicate/share lots of code. > BroadcastRecordWriter would have fields: > > private final RecordSerializer<T> serializers; > > private final Optional<BufferBuilder> bufferBuilder; > > Compared to RecordWriter’s arrays. > > b) One thing that I noticed now are latency markers and randomEmit method. It > prevents us from implementing option [1]. BroadcastRecordWriter would have to > flush all channels on randomEmit (as I proposed in option [2]). > > c) Another option to optimise broadcast writes (or for that matter all multi > channel writes), would be to serialise record only once to > SpanningRecordSerializer#serializationBuffer, but copy it multiple times to > separate BufferBuilders. That would save us much more then half of the > overhead (serialisation is more costly compared to data copying), while we > would avoid problems with uneven state of channels. There would be no > problems with mixed broadcast/non broadcast writes, this option could support > both of them at the same time - in other words, it would be as generic as the > current one. > > d) Regarding StreamRecordWriter, other option is, that it could be refactored > to a class implementing extracted RecordWriter interface and being a > proxy/wrapper around another RecordWriter instance: > > Class StreamRecordWriter implements RecordWriter { > private final RecordWrtier recordWriter; //either broadcast or non > broadcast > public void foo() { > recordWriter.foo(); > } > } > > To be honest I’m not sure at the moment which one would be better [2] or [c]. > In ideal world, we might want to replace current RecordWriter with [c] and > after that (if that’s not enough) to implement [2] on top of [c]. > > Piotrek > > > On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) > > <wangzhijiang...@aliyun.com.INVALID> wrote: > > > > Hi Piotr, > > > > Thanks for your replies and professional suggestions! > > > > My initial thought is just as you said in first suggestion. The current > > RecordWriter will emit StreamRecord to some subpartition via > > ChannelSelector or broadcast events/watermark to all subpartitions directly. > > If the ChannelSelector implementation is BroadcastPartitioner, then we can > > create a specialized BroadcastRecordWriter to handle the 'emit', > > 'broadcastEmit', 'broadcastEvent', etc. > > To make it seems not tricky, I want to abstract the RecordWriter as a > > plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter > > separately to extend abstract RecordWriter. That means we divide the > > RecordWriter by ChannelSelector, and also we may remove current > > StreamRecordWriter to uniform the RecordWriter criteria in both stream and > > batch mode. > > > > Considering specific implementations, I think one RecordSerializer can work > > for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the > > precondition is making the RecordSerializer has no internal state, so we > > have to remove the BufferBuilder variable from SpanningRecordSerializer and > > pass it via addRecord/continueWritingWithNextBufferBuilder > > methods from RecordWriter. BroadcastRecordWriter only needs maintain one > > BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need > > maintain one BufferBuilder per subpartition. > > > > Another issue is whether this improvement is suitable for > > broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in > > suggestion 2,3. I wonder it may decrease the buffer utilization if switch > > between broadcast and non-broadcast modes, even it may seem more tricky in > > implementation. I am still thinking of it. > > > > Maybe we can implement the improvement for BroadcastPartitioner in first > > step and make sure one RecordSerializer for all subpartitions. That can > > reduce the memory overhead in RecordSerializer and the time cost in > > broadcast serialization scenarios. > > > > Best, > > > > Zhijiang > > > > > > ------------------------------------------------------------------ > > 发件人:Piotr Nowojski <pi...@data-artisans.com> > > 发送时间:2018年7月17日(星期二) 23:31 > > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) > > <wangzhijiang...@aliyun.com> > > 主 题:Re: [DISCUSS] Improve broadcast serialization > > > > Hi > > > > Generally speaking this would be a nice optimisation, however it might be > > tricky to implement. The thing to keep in mind is that currently interface > > allow to interleave broadcasting and normal sending, because of that at any > > given time some serialisers can have more data then others. For example > > when we have two output channels and we are looping following writes: > > > > Write sth to 1. Channel > > Broadcast to all channels > > Write sth to 1. Channel > > Broadcast to all channels > > Write sth to 1. Channel > > Broadcast to all channels > > (…) > > > > Thus buffers of different channels can fill out with different rates. > > > >> In theory every record can be serialized only once and referenced for all > >> the subpartitions in broadcast mode. > > > > The problem here is that after records serialising, the only unit that can > > be referenced afterwards is “Buffer”. So that would leave us now with > > couple of options: > > > > 1. Create a specialised BroadcastRecordWriter that supports ONLY > > broadcasting, guaranteeing that all channels always receive the same data. > > Here you could serialise records only once, to one BufferBuilder that could > > be shared and referenced by multiple BufferConsumers from different > > channels. Any non broadcast write would have to fail. > > > > 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in > > 1. for broadcasts, but for any non broadcast write: finish current > > broadcasting BufferBuilder, flush all data on all channels, serialise > > single record to single channel using newly create BufferBuilder and also > > immediately finish/flush it, so that any subsequent broadcasts will work > > again as in 1.: > > > > 3. Similar as 2, but lazily switch between broadcasting and > > non-broadcasting modes. It would have two modes of operating that could be > > switched back and forth: the same as currently implemented for > > non-broadcasted and optimised broadcast mode > > > > Broadcast to all channels > > Broadcast to all channels > > Broadcast to all channels > > Broadcast to all channels > > Write sth to X Channel // this flushes all channels and clears/finishes > > previous BufferBuilder > > Write sth to Y Channel > > Write sth to Y Channel > > Write sth to Y Channel > > Write sth to X Channel > > Broadcast to all channels // this flushes all channels and clears/finishes > > previous BufferBuilders, > > Broadcast to all channels > > Broadcast to all channels > > (…) > > > > However both in 2. and 3. there would be very big penalty for mixing > > broadcast with normal writes. > > > > Piotrek > > > >> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) > >> <wangzhijiang...@aliyun.com.INVALID> wrote: > >> > >> Hi all, > >> > >> In current implementation, the RecordSerializer is created separately for > >> each subpartition in RecordWriter, that means the number of serializers > >> equals to the number of subpartitions. > >> For broadcast partitioner, every record will be serialized many times in > >> all the subpartitions, and this may bring bad performance to some extent. > >> In theory every record can be serialized only once and referenced for all > >> the subpartitions in broadcast mode. > >> > >> To do so, I propose the following changes: > >> 1. Create and maintain only one serializer in RecordWriter, and it will > >> serialize the record for all the subpartitions. It makes sense for any > >> partitioners, and the memory overhead can be also decreased, because every > >> serializer will maintain some separate byte buffers internally. > >> 2. Maybe we can abstract the RecordWriter as a base class used for other > >> partitioner mode and implement a BroadcastRecordWriter for > >> BroadcastPartitioner. And this new implementation will add buffer > >> references based on the number of subpartitions before adding into > >> subpartition queue. > >> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to > >> RecordWriter, then the uniform RecordWriter can be used for both stream > >> and batch. The above BroadcastRecordWriter can aslo uniform for both > >> stream and batch. > >> > >> I am not sure whether this improvement is proposed before and what do you > >> think of it? > >> If necessary I can create JIRAs to contirbute it, and may need one > >> commiter cooperate with me. > >> > >> Best, > >> > >> Zhijiang > > >