AHeise commented on pull request #12353: URL: https://github.com/apache/flink/pull/12353#issuecomment-635203522
> Hey @AHeise , I've spent some time walking through the code. I think the bug is clear that the consumer's reference counter of `bufferBuilder` for `randomEmit` is not counted correctly. But I was a bit confused whether this fix work as expected in the case if the `record` to `randomEmit` is more than one buffer able to hold. But I am not an expert, could be wrong :-) > > In `BroadcastRandomWriter#randomEmit`, random triggered data is emitted in the line of `emit(record, targetChannelIndex);` > > If tracing the code down, eventually data is serialized through `RecordWriter#copyFromSerializerToTargetChannel` > > and there, `requestNewBufferBuilder` is requested multiple times if the data to serialize is more than one buffer can hold. > > And as you can see, all the references to the remaining bufferBuilders are lost. Since the `addConsumer` in `randomEmit` is done after `emit(record, targetChannelIndex)`, it looks like so purely from the code perspective. > > But I am not quite familiar with this part of code, not sure whether this will cause real problems. > Or we can include a test to see whether it works as expected in this case? I think this is actually by design (it's not modifying that behavior from @zhijiangW ). If a record is randomly emitted that is longer than one buffer, only the last buffer is shared ultimately between all channels. The other buffers have no relevant data for the non-target channel. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org