Hi 

> 1. I want to define a new AbstractRecordWriter as base class which defines 
> some abstract methods and utility codes. The current RecordWriter used for 
> other partitioner and new BroadcastRecordWriter used only for 
> BroadcastPartitioner will both extend AbstractRecordWriter. The fields in 
> BroadcastPartitioner are extactly as you showed below, but for current 
> RecordWriter it also only needs one RecordSerializer if we make the 
> RecordSerializer has no internal state.

Lets first discuss what we would like to have/implement on higher level and 
later focus on implementation details. Regarding making RecordSerializer 
stateless, there were some discussions about it previously and it was on our 
TODO list but I don’t remember what was holding us back. Maybe Nico will 
remember?

> 
> 2. You pointed the key problem that how to handle `randomEmit` in 
> BroadcastRecordWriter, and I think this process may resue the `emit` logic in 
> current RecordWriter. Then the `emit` and `broadcastEmit` logics in 
> BroadcastRecordWriter will serialize data only once and copy to BufferBuilder 
> only once. So this improvement is deterministic for BroadcastPartitioner.
> 

What logic to reuse do you have in mind? 
 
> 
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast 
> partitioner, we can also do as you suggested in option [2], but it has to 
> finish/flush the previous BufferBuilder generated by common `emit` operation. 
> So it may bring bad impacts on buffer utility which was improved well in 
> event-driven flush feature. So I am not sure whether it is worth doing 
> `broadcastEmit` improvement in RecordWriter.
> 

The whole point of my proposal [c] was to avoid the need to flush. Code would 
need a little bit more refactoring but it should look something like this:

void broadcastEmit(record):
        serializedRecord = serializer.serialize(record)
        for bufferBuilder in bufferBuilders:
                bufferBuilder.append(serializedRecord)
                // if we overfilled bufferBuilder, finish it, request new one 
and continue writing

void emit(record, channel)
        serializedRecord = serializer.serialize(record)
        bufferBuilders[channel].append(serializedRecord)
        // if we overfilled bufferBuilder, finish it, request new one and 
continue writing

I do not see here a need for additional flushes and it should be strict 
improvement over current code base.

> 
> I already realized the demo covering above 1,2,5 before. I can create jiras 
> after we reach a final agreement, then maybe you can help review PR if have 
> time. :)
> 

Sure :)

Piotrek
> 
> Best,
> 
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <pi...@data-artisans.com>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) 
> <wangzhijiang...@aliyun.com>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
> 
> Hi,
> 
> Couple of more thoughts
> 
> a) I’m not sure if you would have to modify current RecordWriter at all. You 
> could extract interface from current RecordWriter and just provide two 
> implementations: current one and BroadcastRecordWriter. I’m not sure, but it 
> doesn’t seem like they would duplicate/share lots of code. 
> BroadcastRecordWriter would have fields:
> 
> private final RecordSerializer<T> serializers;
> 
> private final Optional<BufferBuilder> bufferBuilder;
> 
> Compared to RecordWriter’s arrays.
> 
> b) One thing that I noticed now are latency markers and randomEmit method. It 
> prevents us from implementing option [1]. BroadcastRecordWriter would have to 
> flush all channels on randomEmit (as I proposed in option [2]).
> 
> c) Another option to optimise broadcast writes (or for that matter all multi 
> channel writes), would be to serialise record only once to 
> SpanningRecordSerializer#serializationBuffer, but copy it multiple times to 
> separate BufferBuilders. That would save us much more then half of the 
> overhead (serialisation is more costly compared to data copying), while we 
> would avoid problems with uneven state of channels. There would be no 
> problems with mixed broadcast/non broadcast writes, this option could support 
> both of them at the same time - in other words, it would be as generic as the 
> current one.
> 
> d) Regarding StreamRecordWriter, other option is, that it could be refactored 
> to a class implementing extracted RecordWriter interface and being a 
> proxy/wrapper around another RecordWriter instance:
> 
> Class StreamRecordWriter implements RecordWriter {
>   private final RecordWrtier recordWriter; //either broadcast or non 
> broadcast 
>   public void foo() {
>     recordWriter.foo();
>   }
> }
> 
> To be honest I’m not sure at the moment which one would be better [2] or [c]. 
> In ideal world, we might want to replace current RecordWriter with [c] and 
> after that (if that’s not enough) to implement [2] on top of [c]. 
> 
> Piotrek
> 
> > On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999) 
> > <wangzhijiang...@aliyun.com.INVALID> wrote:
> > 
> > Hi Piotr,
> > 
> > Thanks for your replies and professional suggestions!
> > 
> > My initial thought is just as you said in first suggestion. The current 
> > RecordWriter will emit StreamRecord to some subpartition via 
> > ChannelSelector or broadcast events/watermark to all subpartitions directly.
> > If the ChannelSelector implementation is BroadcastPartitioner, then we can 
> > create a specialized BroadcastRecordWriter to handle the 'emit', 
> > 'broadcastEmit', 'broadcastEvent', etc.
> > To make it seems not tricky, I want to abstract the RecordWriter as a 
> > plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter 
> > separately to extend abstract RecordWriter. That means we divide the 
> > RecordWriter by ChannelSelector, and also we may remove current 
> > StreamRecordWriter to uniform the RecordWriter criteria in both stream and 
> > batch mode.
> > 
> > Considering specific implementations, I think one RecordSerializer can work 
> > for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the 
> > precondition is making the RecordSerializer has no internal state, so we 
> > have to remove the BufferBuilder variable from SpanningRecordSerializer and 
> > pass it via addRecord/continueWritingWithNextBufferBuilder
> > methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
> > BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
> > maintain one BufferBuilder per subpartition.
> > 
> > Another issue is whether this improvement is suitable for 
> > broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in 
> > suggestion 2,3. I wonder it may decrease the buffer utilization if switch 
> > between broadcast and non-broadcast modes, even it may seem more tricky in 
> > implementation. I am still thinking of it.
> > 
> > Maybe we can implement the improvement for BroadcastPartitioner in first 
> > step and make sure one RecordSerializer for all subpartitions. That can 
> > reduce the memory overhead in RecordSerializer and the time cost in 
> > broadcast serialization scenarios.
> > 
> > Best,
> > 
> > Zhijiang
> > 
> > 
> > ------------------------------------------------------------------
> > 发件人:Piotr Nowojski <pi...@data-artisans.com>
> > 发送时间:2018年7月17日(星期二) 23:31
> > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) 
> > <wangzhijiang...@aliyun.com>
> > 主 题:Re: [DISCUSS] Improve broadcast serialization
> > 
> > Hi
> > 
> > Generally speaking this would be a nice optimisation, however it might be 
> > tricky to implement. The thing to keep in mind is that currently interface 
> > allow to interleave broadcasting and normal sending, because of that at any 
> > given time some serialisers can have more data then others. For example 
> > when we have two output channels and we are looping following writes:
> > 
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > (…)
> > 
> > Thus buffers of different channels can fill out with different rates.
> > 
> >> In theory every record can be serialized only once and referenced for all 
> >> the subpartitions in broadcast mode.
> > 
> > The problem here is that after records serialising, the only unit that can 
> > be referenced afterwards is “Buffer”. So that would leave us now with 
> > couple of options:
> > 
> > 1. Create a specialised BroadcastRecordWriter that supports ONLY 
> > broadcasting, guaranteeing that all channels always receive the same data. 
> > Here you could serialise records only once, to one BufferBuilder that could 
> > be shared and referenced by multiple BufferConsumers from different 
> > channels. Any non broadcast write would have to fail.
> > 
> > 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 
> > 1. for broadcasts, but for any non broadcast write: finish current 
> > broadcasting BufferBuilder, flush all data on all channels, serialise 
> > single record to single channel using newly create BufferBuilder and also 
> > immediately finish/flush it, so that any subsequent broadcasts will work 
> > again as in 1.:
> > 
> > 3. Similar as 2, but lazily switch between broadcasting and 
> > non-broadcasting modes. It would have two modes of operating that could be 
> > switched back and forth: the same as currently implemented for 
> > non-broadcasted and optimised broadcast mode
> > 
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Write sth to X Channel // this flushes all channels and clears/finishes 
> > previous BufferBuilder 
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to X Channel 
> > Broadcast to all channels // this flushes all channels and clears/finishes 
> > previous BufferBuilders, 
> > Broadcast to all channels
> > Broadcast to all channels
> > (…)
> > 
> > However both in 2. and 3. there would be very big penalty for mixing 
> > broadcast with normal writes.  
> > 
> > Piotrek
> > 
> >> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
> >> <wangzhijiang...@aliyun.com.INVALID> wrote:
> >> 
> >> Hi all,
> >> 
> >> In current implementation, the RecordSerializer is created separately for 
> >> each subpartition in RecordWriter, that means the number of serializers 
> >> equals to the number of subpartitions.
> >> For broadcast partitioner, every record will be serialized many times in 
> >> all the subpartitions, and this may bring bad performance to some extent.
> >> In theory every record can be serialized only once and referenced for all 
> >> the subpartitions in broadcast mode.
> >> 
> >> To do so, I propose the following changes:
> >> 1. Create and maintain only one serializer in RecordWriter, and it will 
> >> serialize the record for all the subpartitions. It makes sense for any 
> >> partitioners, and the memory overhead can be also decreased, because every 
> >> serializer will maintain some separate byte buffers internally.
> >> 2. Maybe we can abstract the RecordWriter as a base class used for other 
> >> partitioner mode and implement a BroadcastRecordWriter for 
> >> BroadcastPartitioner. And this new implementation will add buffer 
> >> references based on the number of subpartitions before adding into 
> >> subpartition queue.
> >> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
> >> RecordWriter, then the uniform RecordWriter can be used for both stream 
> >> and batch. The above BroadcastRecordWriter can aslo uniform for both 
> >> stream and batch.
> >> 
> >> I am not sure whether this improvement is proposed before and what do you 
> >> think of it?
> >> If necessary I can create JIRAs to contirbute it, and may need one 
> >> commiter cooperate with me.
> >> 
> >> Best,
> >> 
> >> Zhijiang
> > 
> 

Reply via email to