curcur commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r505183102



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +225,62 @@ protected void releaseInternal() {
                }
        }
 
-       private BufferBuilder getSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
-               final BufferBuilder bufferBuilder = 
subpartitionBufferBuilders[targetSubpartition];
-               if (bufferBuilder != null) {
-                       return bufferBuilder;
-               }
+       private BufferBuilder 
getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws 
IOException {
+               final BufferBuilder bufferBuilder = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+               
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(),
 0);
 
-               return getNewSubpartitionBufferBuilder(targetSubpartition);
+               return bufferBuilder;
        }
 
-       private BufferBuilder getNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
+       private BufferBuilder 
getNewEmptySubpartitionBufferBuilderForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes,
+                       final int targetSubpartition) throws IOException {
+               final BufferBuilder bufferBuilder = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+               final int partialRecordBytes = 
bufferBuilder.appendAndCommit(remainingRecordBytes);
+               
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(),
 partialRecordBytes);
+
+               return bufferBuilder;
+       }
+
+       private BufferBuilder requestNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
                checkInProduceState();
                ensureUnicastMode();
-
                final BufferBuilder bufferBuilder = 
requestNewBufferBuilderFromPool(targetSubpartition);
-               
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
                subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+
                return bufferBuilder;
        }
 
-       private BufferBuilder getBroadcastBufferBuilder() throws IOException {
-               if (broadcastBufferBuilder != null) {
-                       return broadcastBufferBuilder;
+       private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord() 
throws IOException {
+               final BufferBuilder bufferBuilder = 
requestNewBroadcastBufferBuilder();
+               try (final BufferConsumer consumer = 
bufferBuilder.createBufferConsumerFromBeginning()) {
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               subpartition.add(consumer.copy(), 0);
+                       }
                }
 
-               return getNewBroadcastBufferBuilder();
+               return bufferBuilder;
        }
 
-       private BufferBuilder getNewBroadcastBufferBuilder() throws IOException 
{
+       private BufferBuilder 
getNewEmptyBroadcastBufferBuilderForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes) throws 
IOException {
+                       final BufferBuilder bufferBuilder = 
requestNewBroadcastBufferBuilder();
+               final int partialRecordBytes = 
bufferBuilder.appendAndCommit(remainingRecordBytes);
+               try (final BufferConsumer consumer = 
bufferBuilder.createBufferConsumerFromBeginning()) {
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               subpartition.add(consumer.copy(), 
partialRecordBytes);

Review comment:
       I think that's roughly the same (from the complexity point of view).
   
   The only difference is easier to differentiate when remainingRecordLength == 
buffer.size()
   
   But I think that's not that much big difference.
   
   I agree the original code is a bit duplicated. I've rewritten it (literally 
just rewrite), to see whether it looks much better now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to