This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c00877260c0 [FLINK-36021][network] Delegating the responsibility for 
compression to every tier
c00877260c0 is described below

commit c00877260c0dcce8cfd75b7d3a43cf6f68dcc387
Author: Weijie Guo <res...@163.com>
AuthorDate: Fri Aug 9 15:16:57 2024 +0800

    [FLINK-36021][network] Delegating the responsibility for compression to 
every tier
---
 .../hybrid/tiered/common/TieredStorageUtils.java   | 21 +++++++++++++++++++
 .../shuffle/TieredResultPartitionFactory.java      |  9 +++++---
 .../storage/TieredStorageProducerClient.java       | 24 ++++------------------
 .../partition/hybrid/tiered/tier/TierFactory.java  |  6 +++++-
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  9 ++++++--
 .../tiered/tier/disk/DiskTierProducerAgent.java    | 15 ++++++++++----
 .../tiered/tier/memory/MemoryTierFactory.java      |  9 ++++++--
 .../tier/memory/MemoryTierProducerAgent.java       | 16 +++++++++++----
 .../tiered/tier/remote/RemoteTierFactory.java      |  9 ++++++--
 .../tier/remote/RemoteTierProducerAgent.java       | 16 +++++++++++----
 .../hybrid/tiered/storage/TestingTierFactory.java  |  6 +++++-
 .../tier/disk/DiskTierProducerAgentTest.java       |  3 ++-
 .../tier/memory/MemoryTierProducerAgentTest.java   |  6 ++++--
 13 files changed, 103 insertions(+), 46 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
index 8565daf14d2..328573050da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
@@ -32,6 +33,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.R
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Utils for reading from or writing to tiered storage. */
 public class TieredStorageUtils {
 
@@ -128,4 +131,22 @@ public class TieredStorageUtils {
         bufferWithHeaders[index] = header;
         bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
     }
+
+    /** Try compress buffer if possible. */
+    public static Buffer compressBufferIfPossible(
+            Buffer buffer, BufferCompressor bufferCompressor) {
+        if (!canBeCompressed(buffer, bufferCompressor)) {
+            return buffer;
+        }
+
+        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+    }
+
+    /**
+     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
+     * usually small and the size can become even larger after compression.
+     */
+    public static boolean canBeCompressed(Buffer buffer, BufferCompressor 
bufferCompressor) {
+        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
index 4a00401c4d9..9aaaad28f03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
@@ -131,7 +131,8 @@ public class TieredResultPartitionFactory {
                                 tierShuffleDescriptors,
                                 fileChannelManager,
                                 batchShuffleReadBufferPool,
-                                batchShuffleReadIOExecutor);
+                                batchShuffleReadIOExecutor,
+                                bufferCompressor);
 
         // Create producer client.
         TieredStorageProducerClient tieredStorageProducerClient =
@@ -199,7 +200,8 @@ public class TieredResultPartitionFactory {
                     List<TierShuffleDescriptor> tierShuffleDescriptors,
                     FileChannelManager fileChannelManager,
                     BatchShuffleReadBufferPool batchShuffleReadBufferPool,
-                    ScheduledExecutorService batchShuffleReadIOExecutor) {
+                    ScheduledExecutorService batchShuffleReadIOExecutor,
+                    @Nullable BufferCompressor bufferCompressor) {
 
         List<TierProducerAgent> tierProducerAgents = new ArrayList<>();
         List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new 
ArrayList<>();
@@ -238,7 +240,8 @@ public class TieredResultPartitionFactory {
                             
Collections.singletonList(tierShuffleDescriptors.get(index)),
                             Math.max(
                                     2 * 
batchShuffleReadBufferPool.getNumBuffersPerRequest(),
-                                    numberOfSubpartitions));
+                                    numberOfSubpartitions),
+                            bufferCompressor);
             tierProducerAgents.add(producerAgent);
             
tieredStorageMemorySpecs.add(tierFactory.getProducerAgentMemorySpec());
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 9a03aef0005..64edce34c49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -144,21 +144,21 @@ public class TieredStorageProducerClient {
             TieredStorageSubpartitionId subpartitionId,
             Buffer accumulatedBuffer,
             int numRemainingConsecutiveBuffers) {
+        int unCompressedSize = accumulatedBuffer.readableBytes();
         try {
-            Buffer compressedBuffer = 
compressBufferIfPossible(accumulatedBuffer);
             if 
(currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) {
                 chooseStorageTierToStartSegment(subpartitionId, 
numRemainingConsecutiveBuffers + 1);
             }
             if 
(!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
                     subpartitionId,
-                    compressedBuffer,
+                    accumulatedBuffer,
                     bufferAccumulator,
                     numRemainingConsecutiveBuffers)) {
                 chooseStorageTierToStartSegment(subpartitionId, 
numRemainingConsecutiveBuffers + 1);
                 checkState(
                         
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
                                 subpartitionId,
-                                compressedBuffer,
+                                accumulatedBuffer,
                                 bufferAccumulator,
                                 numRemainingConsecutiveBuffers),
                         "Failed to write the first buffer to the new segment");
@@ -167,7 +167,7 @@ public class TieredStorageProducerClient {
             accumulatedBuffer.recycleBuffer();
             ExceptionUtils.rethrow(ioe);
         }
-        updateMetricStatistics(1, accumulatedBuffer.readableBytes());
+        updateMetricStatistics(1, unCompressedSize);
     }
 
     private void chooseStorageTierToStartSegment(
@@ -188,22 +188,6 @@ public class TieredStorageProducerClient {
         throw new IOException("Failed to choose a storage tier to start a new 
segment.");
     }
 
-    private Buffer compressBufferIfPossible(Buffer buffer) {
-        if (!canBeCompressed(buffer)) {
-            return buffer;
-        }
-
-        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
-    }
-
-    /**
-     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
-     * usually small and the size can become even larger after compression.
-     */
-    private boolean canBeCompressed(Buffer buffer) {
-        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
-    }
-
     private void updateMetricStatistics(int numWriteBuffersDelta, int 
numWriteBytesDelta) {
         checkNotNull(metricStatisticsUpdater)
                 .accept(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
index fbfce81f2ee..11fb089a4b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
@@ -27,6 +28,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Tiere
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -61,7 +64,8 @@ public interface TierFactory {
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             List<TierShuffleDescriptor> shuffleDescriptors,
-            int maxRequestedBuffer);
+            int maxRequestedBuffer,
+            @Nullable BufferCompressor bufferCompressor);
 
     /** Creates the consumer-side agent of a Tier. */
     TierConsumerAgent createConsumerAgent(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 786aab43c5f..c3a1ee2c4f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
@@ -38,6 +39,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
@@ -106,7 +109,8 @@ public class DiskTierFactory implements TierFactory {
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             List<TierShuffleDescriptor> shuffleDescriptors,
-            int maxRequestedBuffers) {
+            int maxRequestedBuffers,
+            @Nullable BufferCompressor bufferCompressor) {
         checkState(bufferSizeBytes > 0);
 
         ProducerMergedPartitionFileIndex partitionFileIndex =
@@ -139,7 +143,8 @@ public class DiskTierFactory implements TierFactory {
                 bufferPool,
                 ioExecutor,
                 maxRequestedBuffers,
-                DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT);
+                DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT,
+                bufferCompressor);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index e8d35ea6e39..366a64e033c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -48,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -80,6 +82,8 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
 
     private final DiskIOScheduler diskIOScheduler;
 
+    private final BufferCompressor bufferCompressor;
+
     private volatile boolean isReleased;
 
     DiskTierProducerAgent(
@@ -99,7 +103,8 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             int maxRequestedBuffers,
-            Duration bufferRequestTimeout) {
+            Duration bufferRequestTimeout,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 numBytesPerSegment >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -112,6 +117,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
         this.memoryManager = memoryManager;
         this.firstBufferIndexInSegment = new ArrayList<>();
         this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
 
         for (int i = 0; i < numSubpartitions; ++i) {
             // Each map is used to store the segment ids belonging to a 
subpartition. The map can be
@@ -177,11 +183,12 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getDiskTierName(), finishedBuffer);
+        Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(bufferOwner, 
getDiskTierName(), compressedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
-        emitBuffer(finishedBuffer, subpartitionIndex, 
numRemainingConsecutiveBuffers == 0);
+        emitBuffer(compressedBuffer, subpartitionIndex, 
numRemainingConsecutiveBuffers == 0);
         return true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
index b2d84acb364..8ef55c6b59e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
@@ -34,6 +35,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -91,7 +94,8 @@ public class MemoryTierFactory implements TierFactory {
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             List<TierShuffleDescriptor> shuffleDescriptors,
-            int maxRequestedBuffers) {
+            int maxRequestedBuffers,
+            @Nullable BufferCompressor bufferCompressor) {
         checkState(bufferSizeBytes > 0);
 
         return new MemoryTierProducerAgent(
@@ -103,7 +107,8 @@ public class MemoryTierFactory implements TierFactory {
                 isBroadcastOnly,
                 memoryManager,
                 nettyService,
-                resourceRegistry);
+                resourceRegistry,
+                bufferCompressor);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index 973b8cdf741..54ba4cdd71f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -66,6 +68,8 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
 
     private final MemoryTierSubpartitionProducerAgent[] 
subpartitionProducerAgents;
 
+    private final BufferCompressor bufferCompressor;
+
     public MemoryTierProducerAgent(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
@@ -75,7 +79,8 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
             boolean isBroadcastOnly,
             TieredStorageMemoryManager memoryManager,
             TieredStorageNettyService nettyService,
-            TieredStorageResourceRegistry resourceRegistry) {
+            TieredStorageResourceRegistry resourceRegistry,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 segmentSizeBytes >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -89,6 +94,7 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
         this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
         this.nettyConnectionEstablished = new boolean[numSubpartitions];
         this.subpartitionProducerAgents = new 
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
 
         Arrays.fill(currentSubpartitionWriteBuffers, 0);
         nettyService.registerProducer(partitionId, this);
@@ -138,11 +144,13 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getMemoryTierName(), finishedBuffer);
+        Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(
+                    bufferOwner, getMemoryTierName(), compressedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
-        addFinishedBuffer(finishedBuffer, subpartitionIndex);
+        addFinishedBuffer(compressedBuffer, subpartitionIndex);
         return true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
index 5c0f72ecea5..3d374715a9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -37,6 +38,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -101,7 +104,8 @@ public class RemoteTierFactory implements TierFactory {
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             List<TierShuffleDescriptor> shuffleDescriptors,
-            int maxRequestedBuffers) {
+            int maxRequestedBuffers,
+            @Nullable BufferCompressor bufferCompressor) {
         checkState(bufferSizeBytes > 0);
         checkNotNull(remoteStoragePath);
 
@@ -115,7 +119,8 @@ public class RemoteTierFactory implements TierFactory {
                 isBroadcastOnly,
                 partitionFileWriter,
                 storageMemoryManager,
-                resourceRegistry);
+                resourceRegistry,
+                bufferCompressor);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 8444c29fec9..e3cfcb960aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -19,6 +19,7 @@
 package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -28,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 
 import java.util.Arrays;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -44,6 +46,8 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
 
     private final int[] currentSubpartitionSegmentWriteBuffers;
 
+    private final BufferCompressor bufferCompressor;
+
     RemoteTierProducerAgent(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
@@ -52,7 +56,8 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
             boolean isBroadcastOnly,
             PartitionFileWriter partitionFileWriter,
             TieredStorageMemoryManager memoryManager,
-            TieredStorageResourceRegistry resourceRegistry) {
+            TieredStorageResourceRegistry resourceRegistry,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 numBytesPerSegment >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -67,6 +72,7 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
                         memoryManager,
                         partitionFileWriter);
         this.currentSubpartitionSegmentWriteBuffers = new 
int[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
         Arrays.fill(currentSubpartitionSegmentWriteBuffers, 0);
         resourceRegistry.registerResource(partitionId, 
this::releaseAllResources);
     }
@@ -95,11 +101,13 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
             currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (buffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getRemoteTierName(), buffer);
+        Buffer compressedBuffer = compressBufferIfPossible(buffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(
+                    bufferOwner, getRemoteTierName(), compressedBuffer);
         }
         currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
-        cacheDataManager.appendBuffer(buffer, subpartitionIndex);
+        cacheDataManager.appendBuffer(compressedBuffer, subpartitionIndex);
         return true;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
index 041ac33c480..9d87d8e9bc9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
@@ -28,6 +29,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMast
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiFunction;
@@ -115,7 +118,8 @@ public class TestingTierFactory implements TierFactory {
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             List<TierShuffleDescriptor> shuffleDescriptors,
-            int maxRequestedBuffers) {
+            int maxRequestedBuffers,
+            @Nullable BufferCompressor bufferCompressor) {
         return tierProducerAgentSupplier.get();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 7444dcda671..33a3682c0e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -229,6 +229,7 @@ public class DiskTierProducerAgentTest {
                 new BatchShuffleReadBufferPool(1, 1),
                 new ManuallyTriggeredScheduledExecutorService(),
                 0,
-                Duration.ofMinutes(5));
+                Duration.ofMinutes(5),
+                null);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
index 170ce2e018a..873ccc70197 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
@@ -121,7 +121,8 @@ class MemoryTierProducerAgentTest {
                         false,
                         memoryManager,
                         nettyService,
-                        new TieredStorageResourceRegistry())) {
+                        new TieredStorageResourceRegistry(),
+                        null)) {
             memoryTierProducerAgent.connectionEstablished(
                     SUBPARTITION_ID, new 
TestingNettyConnectionWriter.Builder().build());
             
assertThat(memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 
0)).isFalse();
@@ -220,6 +221,7 @@ class MemoryTierProducerAgentTest {
                 isBroadcastOnly,
                 memoryManager,
                 nettyService,
-                resourceRegistry);
+                resourceRegistry,
+                null);
     }
 }

Reply via email to