wsry commented on a change in pull request #15192: URL: https://github.com/apache/flink/pull/15192#discussion_r595702663
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -212,31 +269,46 @@ private void flushCurrentSortBuffer() throws IOException { if (currentSortBuffer.hasRemaining()) { fileWriter.startNewRegion(); + List<BufferWithChannel> toWrite = new ArrayList<>(); + Queue<MemorySegment> segments = getWriteBuffers(); + while (currentSortBuffer.hasRemaining()) { - BufferWithChannel bufferWithChannel = - currentSortBuffer.copyIntoSegment(writeBuffer); - Buffer buffer = bufferWithChannel.getBuffer(); - int subpartitionIndex = bufferWithChannel.getChannelIndex(); + if (segments.isEmpty()) { + fileWriter.writeBuffers(toWrite); + toWrite.clear(); + segments = getWriteBuffers(); + } - writeCompressedBufferIfPossible(buffer, subpartitionIndex); + BufferWithChannel bufferWithChannel = + currentSortBuffer.copyIntoSegment(checkNotNull(segments.poll())); + toWrite.add(compressBufferIfPossible(bufferWithChannel)); } + + fileWriter.writeBuffers(toWrite); } currentSortBuffer.release(); } - private void writeCompressedBufferIfPossible(Buffer buffer, int targetSubpartition) - throws IOException { - updateStatistics(buffer, targetSubpartition); + private Queue<MemorySegment> getWriteBuffers() { + synchronized (lock) { + checkState(!writeBuffers.isEmpty(), "Task has been canceled."); + return new ArrayDeque<>(writeBuffers); + } + } - try { - if (canBeCompressed(buffer)) { - buffer = bufferCompressor.compressToIntermediateBuffer(buffer); - } - fileWriter.writeBuffer(buffer, targetSubpartition); - } finally { - buffer.recycleBuffer(); + private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel) { + Buffer buffer = bufferWithChannel.getBuffer(); + int channelIndex = bufferWithChannel.getChannelIndex(); + + updateStatistics(buffer, channelIndex); + + if (!canBeCompressed(buffer)) { + return bufferWithChannel; } + + buffer = checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer); Review comment: We have only one IntermediateBuffer in the compressor, after this patch, we may cache multiple data buffers in the result partition, so the single IntermediateBuffer can not be shared by multiple buffers. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org