This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fdcb54cad [#2197] Support reg app conf to server and avoid update 
committed/cached blockIds bitmap (#2196)
fdcb54cad is described below

commit fdcb54cadd1480f028b628adbf8301e5ad593a74
Author: maobaolong <[email protected]>
AuthorDate: Tue Oct 29 20:41:04 2024 +0800

    [#2197] Support reg app conf to server and avoid update committed/cached 
blockIds bitmap (#2196)
    
    ### What changes were proposed in this pull request?
    
    Support reg app conf to server.
    
    ### Why are the changes needed?
    
    Fix #2197
    
    Avoid update committed/cached blockIds bitmap while storage type is with 
memory.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally
---
 .../hadoop/mapred/SortWriteBufferManagerTest.java  |  3 +-
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |  3 +-
 .../shuffle/manager/RssShuffleManagerBase.java     | 26 ++++++++++-
 .../apache/spark/shuffle/RssShuffleManager.java    |  5 ---
 .../common/sort/buffer/WriteBufferManagerTest.java |  3 +-
 .../uniffle/client/api/ShuffleWriteClient.java     | 51 +++++++++++++++++++++-
 .../client/impl/ShuffleWriteClientImpl.java        |  6 ++-
 .../record/reader/MockedShuffleWriteClient.java    |  3 +-
 .../org/apache/uniffle/common/util/Constants.java  |  2 +
 .../test/AQESkewedJoinWithLocalOrderTest.java      |  2 +-
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  9 ++--
 .../client/request/RssRegisterShuffleRequest.java  | 23 ++++++++--
 proto/src/main/proto/Rss.proto                     |  1 +
 .../apache/uniffle/server/ShuffleFlushManager.java | 12 +++--
 .../uniffle/server/ShuffleServerGrpcService.java   | 31 ++++++++++++-
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 11 +++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 38 ++++++++++++----
 17 files changed, 192 insertions(+), 37 deletions(-)

diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
index 159da8a84..77a008c1c 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java
@@ -722,7 +722,8 @@ public class SortWriteBufferManagerTest {
         ShuffleDataDistributionType distributionType,
         int maxConcurrencyPerPartitionToWrite,
         int stageAttemptNumber,
-        RssProtos.MergeContext mergeContext) {}
+        RssProtos.MergeContext mergeContext,
+        Map<String, String> properties) {}
 
     @Override
     public boolean sendCommit(
diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index d2aaebe04..82a98a84e 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -508,7 +508,8 @@ public class FetcherTest {
         ShuffleDataDistributionType distributionType,
         int maxConcurrencyPerPartitionToWrite,
         int stageAttemptNumber,
-        RssProtos.MergeContext mergeContext) {}
+        RssProtos.MergeContext mergeContext,
+        Map<String, String> properties) {}
 
     @Override
     public boolean sendCommit(
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 47f9e271d..11d81ed31 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -34,6 +34,8 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1016,6 +1018,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     }
     LOG.info("Start to register shuffleId {}", shuffleId);
     long start = System.currentTimeMillis();
+    Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
     serverToPartitionRanges.entrySet().stream()
         .forEach(
             entry -> {
@@ -1028,7 +1031,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                   ShuffleDataDistributionType.NORMAL,
                   maxConcurrencyPerPartitionToWrite,
                   stageAttemptNumber,
-                  null);
+                  null,
+                  sparkConfMap);
             });
     LOG.info(
         "Finish register shuffleId {} with {} ms", shuffleId, 
(System.currentTimeMillis() - start));
@@ -1045,6 +1049,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     }
     LOG.info("Start to register shuffleId[{}]", shuffleId);
     long start = System.currentTimeMillis();
+    Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
     Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
         serverToPartitionRanges.entrySet();
     entries.stream()
@@ -1057,7 +1062,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
                   entry.getValue(),
                   remoteStorage,
                   dataDistributionType,
-                  maxConcurrencyPerPartitionToWrite);
+                  maxConcurrencyPerPartitionToWrite,
+                  sparkConfMap);
             });
     LOG.info(
         "Finish register shuffleId[{}] with {} ms",
@@ -1084,4 +1090,20 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
   public boolean isRssStageRetryForFetchFailureEnabled() {
     return rssStageRetryForFetchFailureEnabled;
   }
+
+  @VisibleForTesting
+  public SparkConf getSparkConf() {
+    return sparkConf;
+  }
+
+  public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
+    Map<String, String> map = new HashMap<>();
+
+    for (Tuple2<String, String> tuple : sparkConf.getAll()) {
+      String key = tuple._1;
+      map.put(key, tuple._2);
+    }
+
+    return map;
+  }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 70369f503..95c89bd29 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -899,11 +899,6 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
         this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX));
   }
 
-  @VisibleForTesting
-  public SparkConf getSparkConf() {
-    return sparkConf;
-  }
-
   private synchronized void startHeartbeat() {
     shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout, 
user);
     if (!heartbeatStarted) {
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 3765dc83b..05fd55bc2 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -716,7 +716,8 @@ public class WriteBufferManagerTest {
         ShuffleDataDistributionType dataDistributionType,
         int maxConcurrencyPerPartitionToWrite,
         int stageAttemptNumber,
-        RssProtos.MergeContext mergeContext) {}
+        RssProtos.MergeContext mergeContext,
+        Map<String, String> properties) {}
 
     @Override
     public boolean sendCommit(
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index d21c7e67b..caab46020 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -73,7 +73,53 @@ public interface ShuffleWriteClient {
         dataDistributionType,
         maxConcurrencyPerPartitionToWrite,
         0,
-        null);
+        null,
+        Collections.emptyMap());
+  }
+
+  default void registerShuffle(
+      ShuffleServerInfo shuffleServerInfo,
+      String appId,
+      int shuffleId,
+      List<PartitionRange> partitionRanges,
+      RemoteStorageInfo remoteStorage,
+      ShuffleDataDistributionType dataDistributionType,
+      int maxConcurrencyPerPartitionToWrite,
+      Map<String, String> properties) {
+    registerShuffle(
+        shuffleServerInfo,
+        appId,
+        shuffleId,
+        partitionRanges,
+        remoteStorage,
+        dataDistributionType,
+        maxConcurrencyPerPartitionToWrite,
+        0,
+        null,
+        properties);
+  }
+
+  default void registerShuffle(
+      ShuffleServerInfo shuffleServerInfo,
+      String appId,
+      int shuffleId,
+      List<PartitionRange> partitionRanges,
+      RemoteStorageInfo remoteStorage,
+      ShuffleDataDistributionType dataDistributionType,
+      int maxConcurrencyPerPartitionToWrite,
+      int stageAttemptNumber,
+      MergeContext mergeContext) {
+    registerShuffle(
+        shuffleServerInfo,
+        appId,
+        shuffleId,
+        partitionRanges,
+        remoteStorage,
+        dataDistributionType,
+        maxConcurrencyPerPartitionToWrite,
+        stageAttemptNumber,
+        mergeContext,
+        Collections.emptyMap());
   }
 
   void registerShuffle(
@@ -85,7 +131,8 @@ public interface ShuffleWriteClient {
       ShuffleDataDistributionType dataDistributionType,
       int maxConcurrencyPerPartitionToWrite,
       int stageAttemptNumber,
-      MergeContext mergeContext);
+      MergeContext mergeContext,
+      Map<String, String> properties);
 
   boolean sendCommit(
       Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int 
shuffleId, int numMaps);
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index c81d3c725..ac93d57b1 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -565,7 +565,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       ShuffleDataDistributionType dataDistributionType,
       int maxConcurrencyPerPartitionToWrite,
       int stageAttemptNumber,
-      MergeContext mergeContext) {
+      MergeContext mergeContext,
+      Map<String, String> properties) {
     String user = null;
     try {
       user = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -583,7 +584,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
             dataDistributionType,
             maxConcurrencyPerPartitionToWrite,
             stageAttemptNumber,
-            mergeContext);
+            mergeContext,
+            properties);
     RssRegisterShuffleResponse response =
         getShuffleServerClient(shuffleServerInfo).registerShuffle(request);
 
diff --git 
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java
 
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java
index 6798a792c..d856292f2 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java
@@ -64,7 +64,8 @@ public class MockedShuffleWriteClient implements 
ShuffleWriteClient {
       ShuffleDataDistributionType dataDistributionType,
       int maxConcurrencyPerPartitionToWrite,
       int stageAttemptNumber,
-      RssProtos.MergeContext mergeContext) {}
+      RssProtos.MergeContext mergeContext,
+      Map<String, String> properties) {}
 
   @Override
   public boolean sendCommit(
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index d63c2e46e..79ceb2f10 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -91,4 +91,6 @@ public final class Constants {
   public static final String DRIVER_HOST = "driver.host";
 
   public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
+
+  public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";
 }
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
index aff3ff3e2..c6c8b0796 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java
@@ -28,7 +28,7 @@ public class AQESkewedJoinWithLocalOrderTest extends 
AQESkewedJoinTest {
 
   @Override
   public void updateSparkConfCustomer(SparkConf sparkConf) {
-    sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
+    sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE.name());
     sparkConf.set(
         "spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(),
         ShuffleDataDistributionType.LOCAL_ORDER.name());
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index dccd9f938..98180d647 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -198,7 +198,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       ShuffleDataDistributionType dataDistributionType,
       int maxConcurrencyPerPartitionToWrite,
       int stageAttemptNumber,
-      MergeContext mergeContext) {
+      MergeContext mergeContext,
+      Map<String, String> properties) {
     ShuffleRegisterRequest.Builder reqBuilder = 
ShuffleRegisterRequest.newBuilder();
     reqBuilder
         .setAppId(appId)
@@ -207,7 +208,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         
.setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name()))
         
.setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite)
         .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges))
-        .setStageAttemptNumber(stageAttemptNumber);
+        .setStageAttemptNumber(stageAttemptNumber)
+        .putAllProperties(properties);
     if (mergeContext != null) {
       reqBuilder.setMergeContext(mergeContext);
     }
@@ -484,7 +486,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             request.getDataDistributionType(),
             request.getMaxConcurrencyPerPartitionToWrite(),
             request.getStageAttemptNumber(),
-            request.getMergeContext());
+            request.getMergeContext(),
+            request.getProperties());
 
     RssRegisterShuffleResponse response;
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
index 92ed1e15e..a2cac5367 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
@@ -17,8 +17,11 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.PartitionRange;
@@ -39,7 +42,9 @@ public class RssRegisterShuffleRequest {
   private int stageAttemptNumber;
 
   private final MergeContext mergeContext;
+  private Map<String, String> properties;
 
+  @VisibleForTesting
   public RssRegisterShuffleRequest(
       String appId,
       int shuffleId,
@@ -57,7 +62,8 @@ public class RssRegisterShuffleRequest {
         dataDistributionType,
         maxConcurrencyPerPartitionToWrite,
         0,
-        null);
+        null,
+        Collections.emptyMap());
   }
 
   public RssRegisterShuffleRequest(
@@ -69,7 +75,8 @@ public class RssRegisterShuffleRequest {
       ShuffleDataDistributionType dataDistributionType,
       int maxConcurrencyPerPartitionToWrite,
       int stageAttemptNumber,
-      MergeContext mergeContext) {
+      MergeContext mergeContext,
+      Map<String, String> properties) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionRanges = partitionRanges;
@@ -79,8 +86,10 @@ public class RssRegisterShuffleRequest {
     this.maxConcurrencyPerPartitionToWrite = maxConcurrencyPerPartitionToWrite;
     this.stageAttemptNumber = stageAttemptNumber;
     this.mergeContext = mergeContext;
+    this.properties = properties;
   }
 
+  @VisibleForTesting
   public RssRegisterShuffleRequest(
       String appId,
       int shuffleId,
@@ -97,7 +106,8 @@ public class RssRegisterShuffleRequest {
         dataDistributionType,
         RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
         0,
-        null);
+        null,
+        Collections.emptyMap());
   }
 
   public RssRegisterShuffleRequest(
@@ -111,7 +121,8 @@ public class RssRegisterShuffleRequest {
         ShuffleDataDistributionType.NORMAL,
         RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
         0,
-        null);
+        null,
+        Collections.emptyMap());
   }
 
   public String getAppId() {
@@ -149,4 +160,8 @@ public class RssRegisterShuffleRequest {
   public MergeContext getMergeContext() {
     return mergeContext;
   }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d92ec40c7..5e8cc632d 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -197,6 +197,7 @@ message ShuffleRegisterRequest {
   int32 maxConcurrencyPerPartitionToWrite = 7;
   int32 stageAttemptNumber = 8;
   MergeContext mergeContext = 9;
+  map<string, string> properties = 10;
 }
 
 enum DataDistribution {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 61fdc466a..6fcf7e5c7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -46,6 +46,7 @@ import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
+import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.COMMITTED_BLOCK_COUNT;
@@ -62,6 +63,7 @@ public class ShuffleFlushManager {
   private final String storageType;
   private final int storageDataReplica;
   private final ShuffleServerConf shuffleServerConf;
+  private final boolean storageTypeWithMemory;
   private Configuration hadoopConf;
   // appId -> shuffleId -> committed shuffle blockIds
   private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
@@ -101,6 +103,7 @@ public class ShuffleFlushManager {
                 .mapToLong(bitmap -> bitmap.getLongCardinality())
                 .sum(),
         2 * 60 * 1000L /* 2 minutes */);
+    this.storageTypeWithMemory = 
StorageType.withMemory(StorageType.valueOf(storageType));
   }
 
   public void addToFlushQueue(ShuffleDataFlushEvent event) {
@@ -194,11 +197,14 @@ public class ShuffleFlushManager {
         throw new EventRetryException();
       }
       long endTime = System.currentTimeMillis();
-
-      // update some metrics for shuffle task
-      updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
       ShuffleTaskInfo shuffleTaskInfo =
           
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
+      if (shuffleTaskInfo == null || !storageTypeWithMemory) {
+        // With memory storage type should never need cachedBlockIds,
+        // since client do not need call finish shuffle rpc
+        // update some metrics for shuffle task
+        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), 
event.getShuffleBlocks());
+      }
       if (isStorageAuditLogEnabled) {
         AUDIT_LOGGER.info(
             String.format(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index fbb86e8a4..85fe7fa6e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -98,6 +98,7 @@ import org.apache.uniffle.server.merge.MergeStatus;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.server.merge.ShuffleMergeManager.MERGE_APP_SUFFIX;
 
@@ -322,7 +323,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                   new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
                   user,
                   shuffleDataDistributionType,
-                  maxConcurrencyPerPartitionToWrite);
+                  maxConcurrencyPerPartitionToWrite,
+                  req.getPropertiesMap());
       if (StatusCode.SUCCESS == result
           && shuffleServer.isRemoteMergeEnable()
           && req.hasMergeContext()) {
@@ -338,7 +340,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                     new RemoteStorageInfo(remoteStoragePath, 
remoteStorageConf),
                     user,
                     shuffleDataDistributionType,
-                    maxConcurrencyPerPartitionToWrite);
+                    maxConcurrencyPerPartitionToWrite,
+                    req.getPropertiesMap());
         if (result == StatusCode.SUCCESS) {
           result =
               shuffleServer
@@ -576,6 +579,18 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       auditContext.withAppId(appId).withShuffleId(shuffleId);
+      org.apache.uniffle.common.StorageType storageType =
+          
shuffleServer.getShuffleServerConf().get(ShuffleServerConf.RSS_STORAGE_TYPE);
+      boolean storageTypeWithMemory =
+          StorageType.withMemory(StorageType.valueOf(storageType.name()));
+      if (storageTypeWithMemory) {
+        String errorMessage =
+            String.format(
+                "commitShuffleTask should not be called while server-side 
configured StorageType to %s for appId %s",
+                storageType, appId);
+        LOG.error(errorMessage);
+        throw new UnsupportedOperationException(errorMessage);
+      }
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
         auditContext.withStatusCode(status);
@@ -633,6 +648,18 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       String appId = req.getAppId();
       int shuffleId = req.getShuffleId();
       auditContext.withAppId(appId).withShuffleId(shuffleId);
+      org.apache.uniffle.common.StorageType storageType =
+          
shuffleServer.getShuffleServerConf().get(ShuffleServerConf.RSS_STORAGE_TYPE);
+      boolean storageTypeWithMemory =
+          StorageType.withMemory(StorageType.valueOf(storageType.name()));
+      if (storageTypeWithMemory) {
+        String errorMessage =
+            String.format(
+                "finishShuffle should not be called while server-side 
configured StorageType to %s for appId %s",
+                storageType, appId);
+        LOG.error(errorMessage);
+        throw new UnsupportedOperationException(errorMessage);
+      }
       StatusCode status = verifyRequest(appId);
       if (status != StatusCode.SUCCESS) {
         auditContext.withStatusCode(status);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index be030769f..11ac2dff3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -76,6 +77,7 @@ public class ShuffleTaskInfo {
   private final Map<Integer, ShuffleDetailInfo> shuffleDetailInfos;
 
   private final Map<Integer, Integer> latestStageAttemptNumbers;
+  private Map<String, String> properties;
 
   public ShuffleTaskInfo(String appId) {
     this.appId = appId;
@@ -315,4 +317,13 @@ public class ShuffleTaskInfo {
         + shuffleDetailInfos
         + '}';
   }
+
+  public void setProperties(Map<String, String> properties) {
+    Map<String, String> filteredProperties =
+        properties.entrySet().stream()
+            .filter(entry -> entry.getKey().contains(".rss."))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    this.properties = filteredProperties;
+    LOGGER.info("{} set properties to {}", appId, properties);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index dbc94c007..af37646a7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +87,7 @@ import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
@@ -96,6 +98,7 @@ import static 
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUN
 public class ShuffleTaskManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleTaskManager.class);
+  private final boolean storageTypeWithMemory;
   private ShuffleFlushManager shuffleFlushManager;
   private final ScheduledExecutorService scheduledExecutorService;
   private final ScheduledExecutorService expiredAppCleanupExecutorService;
@@ -146,6 +149,12 @@ public class ShuffleTaskManager {
     this.shuffleBufferManager = shuffleBufferManager;
     this.storageManager = storageManager;
     this.shuffleMergeManager = shuffleMergeManager;
+    org.apache.uniffle.common.StorageType storageType =
+        conf.get(ShuffleServerConf.RSS_STORAGE_TYPE);
+    this.storageTypeWithMemory =
+        storageType == null
+            ? false
+            : StorageType.withMemory(StorageType.valueOf(storageType.name()));
     this.appExpiredWithoutHB = 
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
     this.commitCheckIntervalMax = 
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
     this.preAllocationExpired = 
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
@@ -301,7 +310,8 @@ public class ShuffleTaskManager {
         remoteStorageInfo,
         user,
         ShuffleDataDistributionType.NORMAL,
-        -1);
+        -1,
+        Collections.emptyMap());
   }
 
   public StatusCode registerShuffle(
@@ -311,13 +321,15 @@ public class ShuffleTaskManager {
       RemoteStorageInfo remoteStorageInfo,
       String user,
       ShuffleDataDistributionType dataDistType,
-      int maxConcurrencyPerPartitionToWrite) {
+      int maxConcurrencyPerPartitionToWrite,
+      Map<String, String> properties) {
     ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId);
     lock.lock();
     try {
       refreshAppId(appId);
 
       ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
+      taskInfo.setProperties(properties);
       taskInfo.setUser(user);
       taskInfo.setSpecification(
           ShuffleSpecification.builder()
@@ -520,15 +532,23 @@ public class ShuffleTaskManager {
     }
     ShuffleTaskInfo shuffleTaskInfo =
         shuffleTaskInfos.computeIfAbsent(appId, x -> new 
ShuffleTaskInfo(appId));
-    Roaring64NavigableMap bitmap =
-        shuffleTaskInfo
-            .getCachedBlockIds()
-            .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
-
     long size = 0L;
-    synchronized (bitmap) {
+    // With memory storage type should never need cachedBlockIds,
+    // since client do not need call finish shuffle rpc
+    if (!storageTypeWithMemory) {
+      Roaring64NavigableMap bitmap =
+          shuffleTaskInfo
+              .getCachedBlockIds()
+              .computeIfAbsent(shuffleId, x -> 
Roaring64NavigableMap.bitmapOf());
+
+      synchronized (bitmap) {
+        for (ShufflePartitionedBlock spb : spbs) {
+          bitmap.addLong(spb.getBlockId());
+          size += spb.getEncodedLength();
+        }
+      }
+    } else {
       for (ShufflePartitionedBlock spb : spbs) {
-        bitmap.addLong(spb.getBlockId());
         size += spb.getEncodedLength();
       }
     }

Reply via email to