This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 80aa1705c [#2494] feat(client): Introduce codec statistics to measure 
de/compression performance (#2528)
80aa1705c is described below

commit 80aa1705cda68f0f8bd156b5b26121400c273fc4
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jun 27 15:07:36 2025 +0800

    [#2494] feat(client): Introduce codec statistics to measure de/compression 
performance (#2528)
    
    ### What changes were proposed in this pull request?
    
    Introduce codec statistics to measure de/compression performance.
    
    
![image](https://github.com/user-attachments/assets/bfe9d698-fd51-4187-9b9f-24a9413bb4f7)
    
    ### Why are the changes needed?
    
    The subtask of #2494
    
    ### Does this PR introduce _any_ user-facing change?
    
    `rss.client.io.compression.statisticsEnabled = false`
    
    ### How was this patch tested?
    
    Neen't
---
 .../spark/shuffle/writer/WriteBufferManager.java   |  16 ++-
 .../spark/shuffle/writer/RssShuffleWriter.java     |   1 +
 .../apache/uniffle/common/compression/Codec.java   |   9 ++
 .../common/compression/StatisticsCodec.java        | 118 +++++++++++++++++++++
 .../uniffle/common/config/RssClientConf.java       |   6 ++
 5 files changed, 149 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 671fe6d7f..39b82e496 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -51,6 +51,7 @@ import 
org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
+import org.apache.uniffle.common.compression.StatisticsCodec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.BlockIdLayout;
@@ -193,7 +194,7 @@ public class WriteBufferManager extends MemoryConsumer {
             RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY.substring(
                 RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
             RssSparkConfig.SPARK_SHUFFLE_COMPRESS_DEFAULT);
-    this.codec = compress ? Codec.newInstance(rssConf) : Optional.empty();
+    this.codec = compress ? Codec.create(rssConf) : Optional.empty();
     this.spillFunc = spillFunc;
     this.sendSizeLimit = 
rssConf.get(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMITATION);
     this.memorySpillTimeoutSec = 
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
@@ -718,4 +719,17 @@ public class WriteBufferManager extends MemoryConsumer {
   public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
     return shuffleServerPushCostTracker;
   }
+
+  public void close() {
+    try {
+      if (codec.isPresent()) {
+        Codec internalCodec = codec.get();
+        if (internalCodec instanceof StatisticsCodec) {
+          ((StatisticsCodec) internalCodec).statistics();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Errors on closing buffer manager", e);
+    }
+  }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 8441dc145..a96a29b69 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -982,6 +982,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       // free all memory & metadata, or memory leak happen in executor
       if (bufferManager != null) {
         bufferManager.freeAllMemory();
+        bufferManager.close();
       }
       if (shuffleManager != null) {
         shuffleManager.clearTaskMeta(taskId);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 717a70caf..eb3b2d770 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -22,10 +22,19 @@ import java.util.Optional;
 
 import org.apache.uniffle.common.config.RssConf;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.COMPRESSION_STATISTICS_ENABLED;
 import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
 
 public abstract class Codec {
 
+  public static Optional<Codec> create(RssConf rssConf) {
+    Optional<Codec> codec = newInstance(rssConf);
+    if (codec.isPresent() && 
rssConf.getBoolean(COMPRESSION_STATISTICS_ENABLED)) {
+      return Optional.of(new StatisticsCodec(codec.get()));
+    }
+    return codec;
+  }
+
   public static Optional<Codec> newInstance(RssConf rssConf) {
     Type type = rssConf.get(COMPRESSION_TYPE);
     switch (type) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
 
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
new file mode 100644
index 000000000..49aabc32e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticsCodec extends Codec {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StatisticsCodec.class);
+
+  private final Codec codec;
+  // todo: decompression could be involved in cost tracking.
+  private List<CodecCost> compressCosts;
+
+  StatisticsCodec(Codec codec) {
+    LOGGER.info("Statistic codec is enabled");
+    this.codec = codec;
+    this.compressCosts = new ArrayList<>();
+  }
+
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, 
int destOffset) {
+    this.codec.decompress(src, uncompressedLen, dest, destOffset);
+  }
+
+  @Override
+  public byte[] compress(byte[] src) {
+    long start = System.currentTimeMillis();
+    byte[] result = this.codec.compress(src);
+    compressCosts.add(new CodecCost(src.length, result.length, 
System.currentTimeMillis() - start));
+    return result;
+  }
+
+  @Override
+  public int compress(ByteBuffer src, ByteBuffer dest) {
+    return this.codec.compress(src, dest);
+  }
+
+  @Override
+  public int maxCompressedLength(int sourceLength) {
+    return this.codec.maxCompressedLength(sourceLength);
+  }
+
+  public void statistics() {
+    if (compressCosts.isEmpty()) {
+      return;
+    }
+
+    // Sort by sourceByteSize
+    compressCosts.sort(Comparator.comparingInt(c -> c.sourceByteSize));
+
+    LOGGER.info(
+        "Statistics of compression({}): \n"
+            + "-------------------------------------------"
+            + "\nMinimum: {} \nP25: {} \nMedian: {} \nP75: {} \nP90: {} 
\nMaximum: {}\n"
+            + "-------------------------------------------",
+        compressCosts.size(),
+        compressCosts.get(0),
+        percentile(compressCosts, 0.25),
+        percentile(compressCosts, 0.50),
+        percentile(compressCosts, 0.75),
+        percentile(compressCosts, 0.90),
+        compressCosts.get(compressCosts.size() - 1));
+  }
+
+  private CodecCost percentile(List<CodecCost> values, double percentile) {
+    if (values.isEmpty()) {
+      return null;
+    }
+    int index = (int) Math.ceil(percentile * values.size()) - 1;
+    index = Math.min(Math.max(index, 0), values.size() - 1);
+    return values.get(index);
+  }
+
+  static class CodecCost {
+    private int sourceByteSize;
+    private int targetByteSize;
+    private long duration;
+
+    CodecCost(int sourceByteSize, int targetByteSize, long duration) {
+      this.sourceByteSize = sourceByteSize;
+      this.targetByteSize = targetByteSize;
+      this.duration = duration;
+    }
+
+    @Override
+    public String toString() {
+      return "CodecCost{"
+          + "sourceByteSize="
+          + sourceByteSize
+          + ", targetByteSize="
+          + targetByteSize
+          + ", durationMillis="
+          + duration
+          + '}';
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index fdeec0f5d..0c3a465f2 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -39,6 +39,12 @@ public class RssClientConf {
    */
   public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop.";
 
+  public static final ConfigOption<Boolean> COMPRESSION_STATISTICS_ENABLED =
+      ConfigOptions.key("rss.client.io.compression.statisticsEnabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to enable compression statistics on RSS 
clients");
+
   public static final ConfigOption<Codec.Type> COMPRESSION_TYPE =
       ConfigOptions.key("rss.client.io.compression.codec")
           .enumType(Codec.Type.class)

Reply via email to