This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4629fc6ee [#2544] fix(client): NPE about StatisticsCodec (#2547)
4629fc6ee is described below
commit 4629fc6eeb8b690056fc076ce22f4f8e3665348a
Author: SalvadorRomo <[email protected]>
AuthorDate: Wed Jul 23 05:14:09 2025 -0600
[#2544] fix(client): NPE about StatisticsCodec (#2547)
### What changes were proposed in this pull request?
Converting `List<codeCost>` into a synchronize list
### Why are the changes needed?
this bugs happened in concurrent environment when spark workers with the
`RssShuffleManager` and spark.rss.client.io.compression.statisticsEnabled
property enabled, logs its compression statistics when finished, but since the
class was not prepared for concurrent enviroment, at the time to call
`List<codeCost>` into the `codec.statistics()` methods enters in a race
condition.
Fix: # 2544
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
this issue was diffcult to replicate into a local enviroment, but instead
when the patch was applied, i make sure the application continue working as
usual by:
1- deploying the application based on `./deploy/docker/read.me`
2- when executing the spark-shell make sure to do it by including the
`spark.rss.client.io.compression.statisticsEnabled ` props as follow:
```
docker exec -it rss-spark-master-1 /opt/spark/bin/spark-shell \
--master spark://rss-spark-master-1:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
--conf
spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
--conf spark.rss.storage.type=MEMORY_LOCALFILE \
--conf spark.speculation=true \
--conf spark.rss.client.io.compression.statisticsEnabled=true
```
3- run multipe spark scala jobs
4- when finishing, into each worker, look for the logs in
`/opt/spark/work/...`
5- looks for every entry in the file that succesfully logs the statistics.
---------
Co-authored-by: Salvador Romo <[email protected]>
---
.../java/org/apache/uniffle/common/compression/StatisticsCodec.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
index 49aabc32e..50ba22a10 100644
---
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
+++
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.compression;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -30,12 +31,12 @@ public class StatisticsCodec extends Codec {
private final Codec codec;
// todo: decompression could be involved in cost tracking.
- private List<CodecCost> compressCosts;
+ private final List<CodecCost> compressCosts;
StatisticsCodec(Codec codec) {
LOGGER.info("Statistic codec is enabled");
this.codec = codec;
- this.compressCosts = new ArrayList<>();
+ this.compressCosts = Collections.synchronizedList(new ArrayList<>());
}
@Override