wsry commented on a change in pull request #15192:
URL: https://github.com/apache/flink/pull/15192#discussion_r595702663



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException {
         if (currentSortBuffer.hasRemaining()) {
             fileWriter.startNewRegion();
 
+            List<BufferWithChannel> toWrite = new ArrayList<>();
+            Queue<MemorySegment> segments = getWriteBuffers();
+
             while (currentSortBuffer.hasRemaining()) {
-                BufferWithChannel bufferWithChannel =
-                        currentSortBuffer.copyIntoSegment(writeBuffer);
-                Buffer buffer = bufferWithChannel.getBuffer();
-                int subpartitionIndex = bufferWithChannel.getChannelIndex();
+                if (segments.isEmpty()) {
+                    fileWriter.writeBuffers(toWrite);
+                    toWrite.clear();
+                    segments = getWriteBuffers();
+                }
 
-                writeCompressedBufferIfPossible(buffer, subpartitionIndex);
+                BufferWithChannel bufferWithChannel =
+                        
currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll()));
+                toWrite.add(compressBufferIfPossible(bufferWithChannel));
             }
+
+            fileWriter.writeBuffers(toWrite);
         }
 
         currentSortBuffer.release();
     }
 
-    private void writeCompressedBufferIfPossible(Buffer buffer, int 
targetSubpartition)
-            throws IOException {
-        updateStatistics(buffer, targetSubpartition);
+    private Queue<MemorySegment> getWriteBuffers() {
+        synchronized (lock) {
+            checkState(!writeBuffers.isEmpty(), "Task has been canceled.");
+            return new ArrayDeque<>(writeBuffers);
+        }
+    }
 
-        try {
-            if (canBeCompressed(buffer)) {
-                buffer = bufferCompressor.compressToIntermediateBuffer(buffer);
-            }
-            fileWriter.writeBuffer(buffer, targetSubpartition);
-        } finally {
-            buffer.recycleBuffer();
+    private BufferWithChannel compressBufferIfPossible(BufferWithChannel 
bufferWithChannel) {
+        Buffer buffer = bufferWithChannel.getBuffer();
+        int channelIndex = bufferWithChannel.getChannelIndex();
+
+        updateStatistics(buffer, channelIndex);
+
+        if (!canBeCompressed(buffer)) {
+            return bufferWithChannel;
         }
+
+        buffer = 
checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);

Review comment:
       We have only one IntermediateBuffer in the compressor, after this patch, 
we may cache multiple data buffers in the result partition, so the single 
IntermediateBuffer can not be shared by multiple buffers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to