[ 
https://issues.apache.org/jira/browse/FLINK-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424169#comment-16424169
 ] 

Triones Deng commented on FLINK-9087:
-------------------------------------

[~yuzhih...@gmail.com]   when i run the test. i found that in 
{code:java}
public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException {
                try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
                                .......
                                // retain the buffer so that it can be recycled 
by each channel of targetPartition
                                
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
                        }
                        .......
                        return eventBufferConsumer;
                }
        }
{code}

which call targetPartition.addBufferConsumer() , here make use of the copy of 
the eventBufferConsumer, so, all the BufferConsumer produced by copy share the 
same buffer.and this will call AbstractReferenceCountedByteBuf.retain() , here 
AbstractReferenceCountedByteBuf.java is netty class

all the targetPartition like AbstractCollectingResultPartitionWriter and 
ResultPartition will call close method of BufferConsumer, at last the buffer in 
eventBufferConsumer  will be released. ResultPartition will call 
notifyDataAvailable which is async to consume the data. so here we'd better to 
let the return value alone,  what do you think. or just change the method 
signature to void ?

notice that in FLINK-7315, plan to use flink's buffers in netty, one sub task 
FLINK-7518 which have a solution.  i am a new here, any suggestions?

> Return value of broadcastEvent should be closed in 
> StreamTask#performCheckpoint
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-9087
>                 URL: https://issues.apache.org/jira/browse/FLINK-9087
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Ted Yu
>            Priority: Minor
>
> {code}
>         for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
> streamRecordWriter : streamRecordWriters) {
>           try {
>             streamRecordWriter.broadcastEvent(message);
> {code}
> The BufferConsumer returned by broadcastEvent() should be closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to