This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 80aa1705c [#2494] feat(client): Introduce codec statistics to measure
de/compression performance (#2528)
80aa1705c is described below
commit 80aa1705cda68f0f8bd156b5b26121400c273fc4
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jun 27 15:07:36 2025 +0800
[#2494] feat(client): Introduce codec statistics to measure de/compression
performance (#2528)
### What changes were proposed in this pull request?
Introduce codec statistics to measure de/compression performance.

### Why are the changes needed?
The subtask of #2494
### Does this PR introduce _any_ user-facing change?
`rss.client.io.compression.statisticsEnabled = false`
### How was this patch tested?
Neen't
---
.../spark/shuffle/writer/WriteBufferManager.java | 16 ++-
.../spark/shuffle/writer/RssShuffleWriter.java | 1 +
.../apache/uniffle/common/compression/Codec.java | 9 ++
.../common/compression/StatisticsCodec.java | 118 +++++++++++++++++++++
.../uniffle/common/config/RssClientConf.java | 6 ++
5 files changed, 149 insertions(+), 1 deletion(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 671fe6d7f..39b82e496 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -51,6 +51,7 @@ import
org.apache.uniffle.client.common.ShuffleServerPushCostTracker;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
+import org.apache.uniffle.common.compression.StatisticsCodec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.BlockIdLayout;
@@ -193,7 +194,7 @@ public class WriteBufferManager extends MemoryConsumer {
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY.substring(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
RssSparkConfig.SPARK_SHUFFLE_COMPRESS_DEFAULT);
- this.codec = compress ? Codec.newInstance(rssConf) : Optional.empty();
+ this.codec = compress ? Codec.create(rssConf) : Optional.empty();
this.spillFunc = spillFunc;
this.sendSizeLimit =
rssConf.get(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMITATION);
this.memorySpillTimeoutSec =
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
@@ -718,4 +719,17 @@ public class WriteBufferManager extends MemoryConsumer {
public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
return shuffleServerPushCostTracker;
}
+
+ public void close() {
+ try {
+ if (codec.isPresent()) {
+ Codec internalCodec = codec.get();
+ if (internalCodec instanceof StatisticsCodec) {
+ ((StatisticsCodec) internalCodec).statistics();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Errors on closing buffer manager", e);
+ }
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 8441dc145..a96a29b69 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -982,6 +982,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
// free all memory & metadata, or memory leak happen in executor
if (bufferManager != null) {
bufferManager.freeAllMemory();
+ bufferManager.close();
}
if (shuffleManager != null) {
shuffleManager.clearTaskMeta(taskId);
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 717a70caf..eb3b2d770 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -22,10 +22,19 @@ import java.util.Optional;
import org.apache.uniffle.common.config.RssConf;
+import static
org.apache.uniffle.common.config.RssClientConf.COMPRESSION_STATISTICS_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
public abstract class Codec {
+ public static Optional<Codec> create(RssConf rssConf) {
+ Optional<Codec> codec = newInstance(rssConf);
+ if (codec.isPresent() &&
rssConf.getBoolean(COMPRESSION_STATISTICS_ENABLED)) {
+ return Optional.of(new StatisticsCodec(codec.get()));
+ }
+ return codec;
+ }
+
public static Optional<Codec> newInstance(RssConf rssConf) {
Type type = rssConf.get(COMPRESSION_TYPE);
switch (type) {
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
new file mode 100644
index 000000000..49aabc32e
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticsCodec extends Codec {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StatisticsCodec.class);
+
+ private final Codec codec;
+ // todo: decompression could be involved in cost tracking.
+ private List<CodecCost> compressCosts;
+
+ StatisticsCodec(Codec codec) {
+ LOGGER.info("Statistic codec is enabled");
+ this.codec = codec;
+ this.compressCosts = new ArrayList<>();
+ }
+
+ @Override
+ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest,
int destOffset) {
+ this.codec.decompress(src, uncompressedLen, dest, destOffset);
+ }
+
+ @Override
+ public byte[] compress(byte[] src) {
+ long start = System.currentTimeMillis();
+ byte[] result = this.codec.compress(src);
+ compressCosts.add(new CodecCost(src.length, result.length,
System.currentTimeMillis() - start));
+ return result;
+ }
+
+ @Override
+ public int compress(ByteBuffer src, ByteBuffer dest) {
+ return this.codec.compress(src, dest);
+ }
+
+ @Override
+ public int maxCompressedLength(int sourceLength) {
+ return this.codec.maxCompressedLength(sourceLength);
+ }
+
+ public void statistics() {
+ if (compressCosts.isEmpty()) {
+ return;
+ }
+
+ // Sort by sourceByteSize
+ compressCosts.sort(Comparator.comparingInt(c -> c.sourceByteSize));
+
+ LOGGER.info(
+ "Statistics of compression({}): \n"
+ + "-------------------------------------------"
+ + "\nMinimum: {} \nP25: {} \nMedian: {} \nP75: {} \nP90: {}
\nMaximum: {}\n"
+ + "-------------------------------------------",
+ compressCosts.size(),
+ compressCosts.get(0),
+ percentile(compressCosts, 0.25),
+ percentile(compressCosts, 0.50),
+ percentile(compressCosts, 0.75),
+ percentile(compressCosts, 0.90),
+ compressCosts.get(compressCosts.size() - 1));
+ }
+
+ private CodecCost percentile(List<CodecCost> values, double percentile) {
+ if (values.isEmpty()) {
+ return null;
+ }
+ int index = (int) Math.ceil(percentile * values.size()) - 1;
+ index = Math.min(Math.max(index, 0), values.size() - 1);
+ return values.get(index);
+ }
+
+ static class CodecCost {
+ private int sourceByteSize;
+ private int targetByteSize;
+ private long duration;
+
+ CodecCost(int sourceByteSize, int targetByteSize, long duration) {
+ this.sourceByteSize = sourceByteSize;
+ this.targetByteSize = targetByteSize;
+ this.duration = duration;
+ }
+
+ @Override
+ public String toString() {
+ return "CodecCost{"
+ + "sourceByteSize="
+ + sourceByteSize
+ + ", targetByteSize="
+ + targetByteSize
+ + ", durationMillis="
+ + duration
+ + '}';
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index fdeec0f5d..0c3a465f2 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -39,6 +39,12 @@ public class RssClientConf {
*/
public static final String HADOOP_CONFIG_KEY_PREFIX = "rss.hadoop.";
+ public static final ConfigOption<Boolean> COMPRESSION_STATISTICS_ENABLED =
+ ConfigOptions.key("rss.client.io.compression.statisticsEnabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable compression statistics on RSS
clients");
+
public static final ConfigOption<Codec.Type> COMPRESSION_TYPE =
ConfigOptions.key("rss.client.io.compression.codec")
.enumType(Codec.Type.class)