curcur commented on a change in pull request #13614: URL: https://github.com/apache/flink/pull/13614#discussion_r505277649
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -211,46 +225,62 @@ protected void releaseInternal() { } } - private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException { - final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition]; - if (bufferBuilder != null) { - return bufferBuilder; - } + private BufferBuilder getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws IOException { + final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition); + subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), 0); - return getNewSubpartitionBufferBuilder(targetSubpartition); + return bufferBuilder; } - private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException { + private BufferBuilder getNewEmptySubpartitionBufferBuilderForRecordContinuation( + final ByteBuffer remainingRecordBytes, + final int targetSubpartition) throws IOException { + final BufferBuilder bufferBuilder = requestNewSubpartitionBufferBuilder(targetSubpartition); + final int partialRecordBytes = bufferBuilder.appendAndCommit(remainingRecordBytes); + subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(), partialRecordBytes); + + return bufferBuilder; + } + + private BufferBuilder requestNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException { checkInProduceState(); ensureUnicastMode(); - final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition); - subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer()); subpartitionBufferBuilders[targetSubpartition] = bufferBuilder; + return bufferBuilder; } - private BufferBuilder getBroadcastBufferBuilder() throws IOException { - if (broadcastBufferBuilder != null) { - return broadcastBufferBuilder; + private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord() throws IOException { + final BufferBuilder bufferBuilder = requestNewBroadcastBufferBuilder(); + try (final BufferConsumer consumer = bufferBuilder.createBufferConsumerFromBeginning()) { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.add(consumer.copy(), 0); + } } - return getNewBroadcastBufferBuilder(); + return bufferBuilder; Review comment: I've rewritten the first commit. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org