This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c00877260c0 [FLINK-36021][network] Delegating the responsibility for compression to every tier c00877260c0 is described below commit c00877260c0dcce8cfd75b7d3a43cf6f68dcc387 Author: Weijie Guo <res...@163.com> AuthorDate: Fri Aug 9 15:16:57 2024 +0800 [FLINK-36021][network] Delegating the responsibility for compression to every tier --- .../hybrid/tiered/common/TieredStorageUtils.java | 21 +++++++++++++++++++ .../shuffle/TieredResultPartitionFactory.java | 9 +++++--- .../storage/TieredStorageProducerClient.java | 24 ++++------------------ .../partition/hybrid/tiered/tier/TierFactory.java | 6 +++++- .../hybrid/tiered/tier/disk/DiskTierFactory.java | 9 ++++++-- .../tiered/tier/disk/DiskTierProducerAgent.java | 15 ++++++++++---- .../tiered/tier/memory/MemoryTierFactory.java | 9 ++++++-- .../tier/memory/MemoryTierProducerAgent.java | 16 +++++++++++---- .../tiered/tier/remote/RemoteTierFactory.java | 9 ++++++-- .../tier/remote/RemoteTierProducerAgent.java | 16 +++++++++++---- .../hybrid/tiered/storage/TestingTierFactory.java | 6 +++++- .../tier/disk/DiskTierProducerAgentTest.java | 3 ++- .../tier/memory/MemoryTierProducerAgentTest.java | 6 ++++-- 13 files changed, 103 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java index 8565daf14d2..328573050da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator; @@ -32,6 +33,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.R import java.nio.ByteBuffer; import java.util.List; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** Utils for reading from or writing to tiered storage. */ public class TieredStorageUtils { @@ -128,4 +131,22 @@ public class TieredStorageUtils { bufferWithHeaders[index] = header; bufferWithHeaders[index + 1] = buffer.getNioBufferReadable(); } + + /** Try compress buffer if possible. */ + public static Buffer compressBufferIfPossible( + Buffer buffer, BufferCompressor bufferCompressor) { + if (!canBeCompressed(buffer, bufferCompressor)) { + return buffer; + } + + return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer); + } + + /** + * Whether the buffer can be compressed or not. Note that event is not compressed because it is + * usually small and the size can become even larger after compression. + */ + public static boolean canBeCompressed(Buffer buffer, BufferCompressor bufferCompressor) { + return bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java index 4a00401c4d9..9aaaad28f03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java @@ -131,7 +131,8 @@ public class TieredResultPartitionFactory { tierShuffleDescriptors, fileChannelManager, batchShuffleReadBufferPool, - batchShuffleReadIOExecutor); + batchShuffleReadIOExecutor, + bufferCompressor); // Create producer client. TieredStorageProducerClient tieredStorageProducerClient = @@ -199,7 +200,8 @@ public class TieredResultPartitionFactory { List<TierShuffleDescriptor> tierShuffleDescriptors, FileChannelManager fileChannelManager, BatchShuffleReadBufferPool batchShuffleReadBufferPool, - ScheduledExecutorService batchShuffleReadIOExecutor) { + ScheduledExecutorService batchShuffleReadIOExecutor, + @Nullable BufferCompressor bufferCompressor) { List<TierProducerAgent> tierProducerAgents = new ArrayList<>(); List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new ArrayList<>(); @@ -238,7 +240,8 @@ public class TieredResultPartitionFactory { Collections.singletonList(tierShuffleDescriptors.get(index)), Math.max( 2 * batchShuffleReadBufferPool.getNumBuffersPerRequest(), - numberOfSubpartitions)); + numberOfSubpartitions), + bufferCompressor); tierProducerAgents.add(producerAgent); tieredStorageMemorySpecs.add(tierFactory.getProducerAgentMemorySpec()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java index 9a03aef0005..64edce34c49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java @@ -144,21 +144,21 @@ public class TieredStorageProducerClient { TieredStorageSubpartitionId subpartitionId, Buffer accumulatedBuffer, int numRemainingConsecutiveBuffers) { + int unCompressedSize = accumulatedBuffer.readableBytes(); try { - Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer); if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) { chooseStorageTierToStartSegment(subpartitionId, numRemainingConsecutiveBuffers + 1); } if (!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite( subpartitionId, - compressedBuffer, + accumulatedBuffer, bufferAccumulator, numRemainingConsecutiveBuffers)) { chooseStorageTierToStartSegment(subpartitionId, numRemainingConsecutiveBuffers + 1); checkState( currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite( subpartitionId, - compressedBuffer, + accumulatedBuffer, bufferAccumulator, numRemainingConsecutiveBuffers), "Failed to write the first buffer to the new segment"); @@ -167,7 +167,7 @@ public class TieredStorageProducerClient { accumulatedBuffer.recycleBuffer(); ExceptionUtils.rethrow(ioe); } - updateMetricStatistics(1, accumulatedBuffer.readableBytes()); + updateMetricStatistics(1, unCompressedSize); } private void chooseStorageTierToStartSegment( @@ -188,22 +188,6 @@ public class TieredStorageProducerClient { throw new IOException("Failed to choose a storage tier to start a new segment."); } - private Buffer compressBufferIfPossible(Buffer buffer) { - if (!canBeCompressed(buffer)) { - return buffer; - } - - return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer); - } - - /** - * Whether the buffer can be compressed or not. Note that event is not compressed because it is - * usually small and the size can become even larger after compression. - */ - private boolean canBeCompressed(Buffer buffer) { - return bufferCompressor != null && buffer.isBuffer() && buffer.readableBytes() > 0; - } - private void updateMetricStatistics(int numWriteBuffersDelta, int numWriteBytesDelta) { checkNotNull(metricStatisticsUpdater) .accept( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java index fbfce81f2ee..11fb089a4b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; @@ -27,6 +28,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Tiere import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import javax.annotation.Nullable; + import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -61,7 +64,8 @@ public interface TierFactory { BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, List<TierShuffleDescriptor> shuffleDescriptors, - int maxRequestedBuffer); + int maxRequestedBuffer, + @Nullable BufferCompressor bufferCompressor); /** Creates the consumer-side agent of a Tier. */ TierConsumerAgent createConsumerAgent( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java index 786aab43c5f..c3a1ee2c4f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex; @@ -38,6 +39,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; +import javax.annotation.Nullable; + import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; @@ -106,7 +109,8 @@ public class DiskTierFactory implements TierFactory { BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, List<TierShuffleDescriptor> shuffleDescriptors, - int maxRequestedBuffers) { + int maxRequestedBuffers, + @Nullable BufferCompressor bufferCompressor) { checkState(bufferSizeBytes > 0); ProducerMergedPartitionFileIndex partitionFileIndex = @@ -139,7 +143,8 @@ public class DiskTierFactory implements TierFactory { bufferPool, ioExecutor, maxRequestedBuffers, - DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT); + DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT, + bufferCompressor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java index e8d35ea6e39..366a64e033c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName; import static org.apache.flink.util.Preconditions.checkArgument; @@ -80,6 +82,8 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro private final DiskIOScheduler diskIOScheduler; + private final BufferCompressor bufferCompressor; + private volatile boolean isReleased; DiskTierProducerAgent( @@ -99,7 +103,8 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, int maxRequestedBuffers, - Duration bufferRequestTimeout) { + Duration bufferRequestTimeout, + BufferCompressor bufferCompressor) { checkArgument( numBytesPerSegment >= bufferSizeBytes, "One segment should contain at least one buffer."); @@ -112,6 +117,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro this.memoryManager = memoryManager; this.firstBufferIndexInSegment = new ArrayList<>(); this.currentSubpartitionWriteBuffers = new int[numSubpartitions]; + this.bufferCompressor = bufferCompressor; for (int i = 0; i < numSubpartitions; ++i) { // Each map is used to store the segment ids belonging to a subpartition. The map can be @@ -177,11 +183,12 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro currentSubpartitionWriteBuffers[subpartitionIndex] = 0; return false; } - if (finishedBuffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, getDiskTierName(), finishedBuffer); + Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, bufferCompressor); + if (compressedBuffer.isBuffer()) { + memoryManager.transferBufferOwnership(bufferOwner, getDiskTierName(), compressedBuffer); } currentSubpartitionWriteBuffers[subpartitionIndex]++; - emitBuffer(finishedBuffer, subpartitionIndex, numRemainingConsecutiveBuffers == 0); + emitBuffer(compressedBuffer, subpartitionIndex, numRemainingConsecutiveBuffers == 0); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java index b2d84acb364..8ef55c6b59e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; @@ -34,6 +35,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; +import javax.annotation.Nullable; + import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -91,7 +94,8 @@ public class MemoryTierFactory implements TierFactory { BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, List<TierShuffleDescriptor> shuffleDescriptors, - int maxRequestedBuffers) { + int maxRequestedBuffers, + @Nullable BufferCompressor bufferCompressor) { checkState(bufferSizeBytes > 0); return new MemoryTierProducerAgent( @@ -103,7 +107,8 @@ public class MemoryTierFactory implements TierFactory { isBroadcastOnly, memoryManager, nettyService, - resourceRegistry); + resourceRegistry, + bufferCompressor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java index 973b8cdf741..54ba4cdd71f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java @@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; @@ -40,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName; import static org.apache.flink.util.Preconditions.checkArgument; @@ -66,6 +68,8 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP private final MemoryTierSubpartitionProducerAgent[] subpartitionProducerAgents; + private final BufferCompressor bufferCompressor; + public MemoryTierProducerAgent( TieredStoragePartitionId partitionId, int numSubpartitions, @@ -75,7 +79,8 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP boolean isBroadcastOnly, TieredStorageMemoryManager memoryManager, TieredStorageNettyService nettyService, - TieredStorageResourceRegistry resourceRegistry) { + TieredStorageResourceRegistry resourceRegistry, + BufferCompressor bufferCompressor) { checkArgument( segmentSizeBytes >= bufferSizeBytes, "One segment should contain at least one buffer."); @@ -89,6 +94,7 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP this.currentSubpartitionWriteBuffers = new int[numSubpartitions]; this.nettyConnectionEstablished = new boolean[numSubpartitions]; this.subpartitionProducerAgents = new MemoryTierSubpartitionProducerAgent[numSubpartitions]; + this.bufferCompressor = bufferCompressor; Arrays.fill(currentSubpartitionWriteBuffers, 0); nettyService.registerProducer(partitionId, this); @@ -138,11 +144,13 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP currentSubpartitionWriteBuffers[subpartitionIndex] = 0; return false; } - if (finishedBuffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, getMemoryTierName(), finishedBuffer); + Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, bufferCompressor); + if (compressedBuffer.isBuffer()) { + memoryManager.transferBufferOwnership( + bufferOwner, getMemoryTierName(), compressedBuffer); } currentSubpartitionWriteBuffers[subpartitionIndex]++; - addFinishedBuffer(finishedBuffer, subpartitionIndex); + addFinishedBuffer(compressedBuffer, subpartitionIndex); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java index 5c0f72ecea5..3d374715a9b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter; @@ -37,6 +38,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; +import javax.annotation.Nullable; + import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -101,7 +104,8 @@ public class RemoteTierFactory implements TierFactory { BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, List<TierShuffleDescriptor> shuffleDescriptors, - int maxRequestedBuffers) { + int maxRequestedBuffers, + @Nullable BufferCompressor bufferCompressor) { checkState(bufferSizeBytes > 0); checkNotNull(remoteStoragePath); @@ -115,7 +119,8 @@ public class RemoteTierFactory implements TierFactory { isBroadcastOnly, partitionFileWriter, storageMemoryManager, - resourceRegistry); + resourceRegistry, + bufferCompressor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java index 8444c29fec9..e3cfcb960aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter; @@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd import java.util.Arrays; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName; import static org.apache.flink.util.Preconditions.checkArgument; @@ -44,6 +46,8 @@ public class RemoteTierProducerAgent implements TierProducerAgent { private final int[] currentSubpartitionSegmentWriteBuffers; + private final BufferCompressor bufferCompressor; + RemoteTierProducerAgent( TieredStoragePartitionId partitionId, int numSubpartitions, @@ -52,7 +56,8 @@ public class RemoteTierProducerAgent implements TierProducerAgent { boolean isBroadcastOnly, PartitionFileWriter partitionFileWriter, TieredStorageMemoryManager memoryManager, - TieredStorageResourceRegistry resourceRegistry) { + TieredStorageResourceRegistry resourceRegistry, + BufferCompressor bufferCompressor) { checkArgument( numBytesPerSegment >= bufferSizeBytes, "One segment should contain at least one buffer."); @@ -67,6 +72,7 @@ public class RemoteTierProducerAgent implements TierProducerAgent { memoryManager, partitionFileWriter); this.currentSubpartitionSegmentWriteBuffers = new int[numSubpartitions]; + this.bufferCompressor = bufferCompressor; Arrays.fill(currentSubpartitionSegmentWriteBuffers, 0); resourceRegistry.registerResource(partitionId, this::releaseAllResources); } @@ -95,11 +101,13 @@ public class RemoteTierProducerAgent implements TierProducerAgent { currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0; return false; } - if (buffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, getRemoteTierName(), buffer); + Buffer compressedBuffer = compressBufferIfPossible(buffer, bufferCompressor); + if (compressedBuffer.isBuffer()) { + memoryManager.transferBufferOwnership( + bufferOwner, getRemoteTierName(), compressedBuffer); } currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++; - cacheDataManager.appendBuffer(buffer, subpartitionIndex); + cacheDataManager.appendBuffer(compressedBuffer, subpartitionIndex); return true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java index 041ac33c480..9d87d8e9bc9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; @@ -28,6 +29,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMast import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import javax.annotation.Nullable; + import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; @@ -115,7 +118,8 @@ public class TestingTierFactory implements TierFactory { BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, List<TierShuffleDescriptor> shuffleDescriptors, - int maxRequestedBuffers) { + int maxRequestedBuffers, + @Nullable BufferCompressor bufferCompressor) { return tierProducerAgentSupplier.get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java index 7444dcda671..33a3682c0e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java @@ -229,6 +229,7 @@ public class DiskTierProducerAgentTest { new BatchShuffleReadBufferPool(1, 1), new ManuallyTriggeredScheduledExecutorService(), 0, - Duration.ofMinutes(5)); + Duration.ofMinutes(5), + null); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java index 170ce2e018a..873ccc70197 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java @@ -121,7 +121,8 @@ class MemoryTierProducerAgentTest { false, memoryManager, nettyService, - new TieredStorageResourceRegistry())) { + new TieredStorageResourceRegistry(), + null)) { memoryTierProducerAgent.connectionEstablished( SUBPARTITION_ID, new TestingNettyConnectionWriter.Builder().build()); assertThat(memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 0)).isFalse(); @@ -220,6 +221,7 @@ class MemoryTierProducerAgentTest { isBroadcastOnly, memoryManager, nettyService, - resourceRegistry); + resourceRegistry, + null); } }