This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4629fc6ee [#2544] fix(client): NPE about StatisticsCodec (#2547)
4629fc6ee is described below

commit 4629fc6eeb8b690056fc076ce22f4f8e3665348a
Author: SalvadorRomo <[email protected]>
AuthorDate: Wed Jul 23 05:14:09 2025 -0600

    [#2544] fix(client): NPE about StatisticsCodec (#2547)
    
    ### What changes were proposed in this pull request?
    Converting  `List<codeCost>`   into a synchronize list
    
    ### Why are the changes needed?
    this bugs happened in concurrent environment when spark workers with the 
`RssShuffleManager` and  spark.rss.client.io.compression.statisticsEnabled 
property enabled, logs its compression statistics when finished, but since the 
class was not prepared for concurrent enviroment, at the time to call 
`List<codeCost>` into the `codec.statistics()` methods enters in a race 
condition.
    
    Fix: # 2544
    
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    this issue was diffcult to replicate  into a local enviroment, but instead  
when the patch was applied, i make sure the application continue working as 
usual by:
    1- deploying the application based on  `./deploy/docker/read.me`
    2- when executing the spark-shell make sure to do it by including the 
`spark.rss.client.io.compression.statisticsEnabled `  props as follow:
    ```
    
    docker exec -it rss-spark-master-1 /opt/spark/bin/spark-shell \
       --master spark://rss-spark-master-1:7077 \
       --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
       --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
       --conf 
spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
       --conf spark.rss.storage.type=MEMORY_LOCALFILE \
       --conf spark.speculation=true \
       --conf spark.rss.client.io.compression.statisticsEnabled=true
    ```
    
    3- run  multipe spark scala jobs
    4-  when finishing, into each worker, look for the logs in 
`/opt/spark/work/...`
    5- looks for every entry in the file that succesfully logs the statistics.
    
    
    ---------
    
    Co-authored-by: Salvador Romo <[email protected]>
---
 .../java/org/apache/uniffle/common/compression/StatisticsCodec.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
 
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
index 49aabc32e..50ba22a10 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/compression/StatisticsCodec.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.compression;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
@@ -30,12 +31,12 @@ public class StatisticsCodec extends Codec {
 
   private final Codec codec;
   // todo: decompression could be involved in cost tracking.
-  private List<CodecCost> compressCosts;
+  private final List<CodecCost> compressCosts;
 
   StatisticsCodec(Codec codec) {
     LOGGER.info("Statistic codec is enabled");
     this.codec = codec;
-    this.compressCosts = new ArrayList<>();
+    this.compressCosts = Collections.synchronizedList(new ArrayList<>());
   }
 
   @Override

Reply via email to