Hi
> 1. I want to define a new AbstractRecordWriter as base class which defines
> some abstract methods and utility codes. The current RecordWriter used for
> other partitioner and new BroadcastRecordWriter used only for
> BroadcastPartitioner will both extend AbstractRecordWriter. The fields in
> BroadcastPartitioner are extactly as you showed below, but for current
> RecordWriter it also only needs one RecordSerializer if we make the
> RecordSerializer has no internal state.
Lets first discuss what we would like to have/implement on higher level and
later focus on implementation details. Regarding making RecordSerializer
stateless, there were some discussions about it previously and it was on our
TODO list but I don’t remember what was holding us back. Maybe Nico will
remember?
>
> 2. You pointed the key problem that how to handle `randomEmit` in
> BroadcastRecordWriter, and I think this process may resue the `emit` logic in
> current RecordWriter. Then the `emit` and `broadcastEmit` logics in
> BroadcastRecordWriter will serialize data only once and copy to BufferBuilder
> only once. So this improvement is deterministic for BroadcastPartitioner.
>
What logic to reuse do you have in mind?
>
> 4. As for 'broadcastEmit` improvement in RecordWriter for non-broadcast
> partitioner, we can also do as you suggested in option [2], but it has to
> finish/flush the previous BufferBuilder generated by common `emit` operation.
> So it may bring bad impacts on buffer utility which was improved well in
> event-driven flush feature. So I am not sure whether it is worth doing
> `broadcastEmit` improvement in RecordWriter.
>
The whole point of my proposal [c] was to avoid the need to flush. Code would
need a little bit more refactoring but it should look something like this:
void broadcastEmit(record):
serializedRecord = serializer.serialize(record)
for bufferBuilder in bufferBuilders:
bufferBuilder.append(serializedRecord)
// if we overfilled bufferBuilder, finish it, request new one
and continue writing
void emit(record, channel)
serializedRecord = serializer.serialize(record)
bufferBuilders[channel].append(serializedRecord)
// if we overfilled bufferBuilder, finish it, request new one and
continue writing
I do not see here a need for additional flushes and it should be strict
improvement over current code base.
>
> I already realized the demo covering above 1,2,5 before. I can create jiras
> after we reach a final agreement, then maybe you can help review PR if have
> time. :)
>
Sure :)
Piotrek
>
> Best,
>
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Piotr Nowojski <[email protected]>
> 发送时间:2018年7月18日(星期三) 16:37
> 收件人:dev <[email protected]>; Zhijiang(wangzhijiang999)
> <[email protected]>
> 主 题:Re: [DISCUSS] Improve broadcast serialization
>
> Hi,
>
> Couple of more thoughts
>
> a) I’m not sure if you would have to modify current RecordWriter at all. You
> could extract interface from current RecordWriter and just provide two
> implementations: current one and BroadcastRecordWriter. I’m not sure, but it
> doesn’t seem like they would duplicate/share lots of code.
> BroadcastRecordWriter would have fields:
>
> private final RecordSerializer<T> serializers;
>
> private final Optional<BufferBuilder> bufferBuilder;
>
> Compared to RecordWriter’s arrays.
>
> b) One thing that I noticed now are latency markers and randomEmit method. It
> prevents us from implementing option [1]. BroadcastRecordWriter would have to
> flush all channels on randomEmit (as I proposed in option [2]).
>
> c) Another option to optimise broadcast writes (or for that matter all multi
> channel writes), would be to serialise record only once to
> SpanningRecordSerializer#serializationBuffer, but copy it multiple times to
> separate BufferBuilders. That would save us much more then half of the
> overhead (serialisation is more costly compared to data copying), while we
> would avoid problems with uneven state of channels. There would be no
> problems with mixed broadcast/non broadcast writes, this option could support
> both of them at the same time - in other words, it would be as generic as the
> current one.
>
> d) Regarding StreamRecordWriter, other option is, that it could be refactored
> to a class implementing extracted RecordWriter interface and being a
> proxy/wrapper around another RecordWriter instance:
>
> Class StreamRecordWriter implements RecordWriter {
> private final RecordWrtier recordWriter; //either broadcast or non
> broadcast
> public void foo() {
> recordWriter.foo();
> }
> }
>
> To be honest I’m not sure at the moment which one would be better [2] or [c].
> In ideal world, we might want to replace current RecordWriter with [c] and
> after that (if that’s not enough) to implement [2] on top of [c].
>
> Piotrek
>
> > On 18 Jul 2018, at 05:36, Zhijiang(wangzhijiang999)
> > <[email protected]> wrote:
> >
> > Hi Piotr,
> >
> > Thanks for your replies and professional suggestions!
> >
> > My initial thought is just as you said in first suggestion. The current
> > RecordWriter will emit StreamRecord to some subpartition via
> > ChannelSelector or broadcast events/watermark to all subpartitions directly.
> > If the ChannelSelector implementation is BroadcastPartitioner, then we can
> > create a specialized BroadcastRecordWriter to handle the 'emit',
> > 'broadcastEmit', 'broadcastEvent', etc.
> > To make it seems not tricky, I want to abstract the RecordWriter as a
> > plugin, then implement a BroadcastRecordWriter and NonBroadcastRecordWriter
> > separately to extend abstract RecordWriter. That means we divide the
> > RecordWriter by ChannelSelector, and also we may remove current
> > StreamRecordWriter to uniform the RecordWriter criteria in both stream and
> > batch mode.
> >
> > Considering specific implementations, I think one RecordSerializer can work
> > for both BroadcastRecordWriter and NonBroadcastRecordWriter, but the
> > precondition is making the RecordSerializer has no internal state, so we
> > have to remove the BufferBuilder variable from SpanningRecordSerializer and
> > pass it via addRecord/continueWritingWithNextBufferBuilder
> > methods from RecordWriter. BroadcastRecordWriter only needs maintain one
> > BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need
> > maintain one BufferBuilder per subpartition.
> >
> > Another issue is whether this improvement is suitable for
> > broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in
> > suggestion 2,3. I wonder it may decrease the buffer utilization if switch
> > between broadcast and non-broadcast modes, even it may seem more tricky in
> > implementation. I am still thinking of it.
> >
> > Maybe we can implement the improvement for BroadcastPartitioner in first
> > step and make sure one RecordSerializer for all subpartitions. That can
> > reduce the memory overhead in RecordSerializer and the time cost in
> > broadcast serialization scenarios.
> >
> > Best,
> >
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > 发件人:Piotr Nowojski <[email protected]>
> > 发送时间:2018年7月17日(星期二) 23:31
> > 收件人:dev <[email protected]>; Zhijiang(wangzhijiang999)
> > <[email protected]>
> > 主 题:Re: [DISCUSS] Improve broadcast serialization
> >
> > Hi
> >
> > Generally speaking this would be a nice optimisation, however it might be
> > tricky to implement. The thing to keep in mind is that currently interface
> > allow to interleave broadcasting and normal sending, because of that at any
> > given time some serialisers can have more data then others. For example
> > when we have two output channels and we are looping following writes:
> >
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > Write sth to 1. Channel
> > Broadcast to all channels
> > (…)
> >
> > Thus buffers of different channels can fill out with different rates.
> >
> >> In theory every record can be serialized only once and referenced for all
> >> the subpartitions in broadcast mode.
> >
> > The problem here is that after records serialising, the only unit that can
> > be referenced afterwards is “Buffer”. So that would leave us now with
> > couple of options:
> >
> > 1. Create a specialised BroadcastRecordWriter that supports ONLY
> > broadcasting, guaranteeing that all channels always receive the same data.
> > Here you could serialise records only once, to one BufferBuilder that could
> > be shared and referenced by multiple BufferConsumers from different
> > channels. Any non broadcast write would have to fail.
> >
> > 2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in
> > 1. for broadcasts, but for any non broadcast write: finish current
> > broadcasting BufferBuilder, flush all data on all channels, serialise
> > single record to single channel using newly create BufferBuilder and also
> > immediately finish/flush it, so that any subsequent broadcasts will work
> > again as in 1.:
> >
> > 3. Similar as 2, but lazily switch between broadcasting and
> > non-broadcasting modes. It would have two modes of operating that could be
> > switched back and forth: the same as currently implemented for
> > non-broadcasted and optimised broadcast mode
> >
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Broadcast to all channels
> > Write sth to X Channel // this flushes all channels and clears/finishes
> > previous BufferBuilder
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to Y Channel
> > Write sth to X Channel
> > Broadcast to all channels // this flushes all channels and clears/finishes
> > previous BufferBuilders,
> > Broadcast to all channels
> > Broadcast to all channels
> > (…)
> >
> > However both in 2. and 3. there would be very big penalty for mixing
> > broadcast with normal writes.
> >
> > Piotrek
> >
> >> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999)
> >> <[email protected]> wrote:
> >>
> >> Hi all,
> >>
> >> In current implementation, the RecordSerializer is created separately for
> >> each subpartition in RecordWriter, that means the number of serializers
> >> equals to the number of subpartitions.
> >> For broadcast partitioner, every record will be serialized many times in
> >> all the subpartitions, and this may bring bad performance to some extent.
> >> In theory every record can be serialized only once and referenced for all
> >> the subpartitions in broadcast mode.
> >>
> >> To do so, I propose the following changes:
> >> 1. Create and maintain only one serializer in RecordWriter, and it will
> >> serialize the record for all the subpartitions. It makes sense for any
> >> partitioners, and the memory overhead can be also decreased, because every
> >> serializer will maintain some separate byte buffers internally.
> >> 2. Maybe we can abstract the RecordWriter as a base class used for other
> >> partitioner mode and implement a BroadcastRecordWriter for
> >> BroadcastPartitioner. And this new implementation will add buffer
> >> references based on the number of subpartitions before adding into
> >> subpartition queue.
> >> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to
> >> RecordWriter, then the uniform RecordWriter can be used for both stream
> >> and batch. The above BroadcastRecordWriter can aslo uniform for both
> >> stream and batch.
> >>
> >> I am not sure whether this improvement is proposed before and what do you
> >> think of it?
> >> If necessary I can create JIRAs to contirbute it, and may need one
> >> commiter cooperate with me.
> >>
> >> Best,
> >>
> >> Zhijiang
> >
>