This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 204e4e4ff [MINOR] improvement(spark-client): put sparkConf as extra 
properties while client request accessCluster (#2254)
204e4e4ff is described below

commit 204e4e4fff380f20c61f2ccbdb43695c3ece6564
Author: maobaolong <[email protected]>
AuthorDate: Tue Nov 26 15:35:41 2024 +0800

    [MINOR] improvement(spark-client): put sparkConf as extra properties while 
client request accessCluster (#2254)
    
    ### What changes were proposed in this pull request?
    
    put sparkConf as extra properties while client request accessCluster
    
    ### Why are the changes needed?
    
    Coordinator can let the spark application access rss cluster or not by some 
customized access checker leverage some of the spark configs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No need
---
 .../java/org/apache/spark/shuffle/RssSparkConfig.java   | 13 +++++++++++++
 .../uniffle/shuffle/manager/RssShuffleManagerBase.java  | 17 ++---------------
 .../spark/shuffle/DelegationRssShuffleManager.java      | 10 ++++++++++
 .../spark/shuffle/DelegationRssShuffleManager.java      | 10 ++++++++++
 .../org/apache/uniffle/common/config/RssClientConf.java |  7 +++++++
 5 files changed, 42 insertions(+), 15 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 45950a42e..4a7f653db 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import scala.Tuple2;
@@ -515,4 +517,15 @@ public class RssSparkConfig {
     }
     return rssConf;
   }
+
+  public static Map<String, String> sparkConfToMap(SparkConf sparkConf) {
+    Map<String, String> map = new HashMap<>();
+
+    for (Tuple2<String, String> tuple : sparkConf.getAll()) {
+      String key = tuple._1;
+      map.put(key, tuple._2);
+    }
+
+    return map;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 8e921c66e..c1b697cbd 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -34,8 +34,6 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1064,7 +1062,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     }
     LOG.info("Start to register shuffleId {}", shuffleId);
     long start = System.currentTimeMillis();
-    Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
+    Map<String, String> sparkConfMap = 
RssSparkConfig.sparkConfToMap(getSparkConf());
     serverToPartitionRanges.entrySet().stream()
         .forEach(
             entry -> {
@@ -1095,7 +1093,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     }
     LOG.info("Start to register shuffleId[{}]", shuffleId);
     long start = System.currentTimeMillis();
-    Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
+    Map<String, String> sparkConfMap = 
RssSparkConfig.sparkConfToMap(getSparkConf());
     Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
         serverToPartitionRanges.entrySet();
     entries.stream()
@@ -1141,15 +1139,4 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
   public SparkConf getSparkConf() {
     return sparkConf;
   }
-
-  public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
-    Map<String, String> map = new HashMap<>();
-
-    for (Tuple2<String, String> tuple : sparkConf.getAll()) {
-      String key = tuple._1;
-      map.put(key, tuple._2);
-    }
-
-    return map;
-  }
 }
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 657be54d1..d2cb4038e 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.Constants;
@@ -127,6 +130,13 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     extraProperties.put(
         ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, 
String.valueOf(assignmentShuffleNodesNum));
 
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    List<String> excludeProperties =
+        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+    rssConf.getAll().stream()
+        .filter(entry -> !excludeProperties.contains(entry.getKey()))
+        .forEach(entry -> extraProperties.put(entry.getKey(), (String) 
entry.getValue()));
+
     Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
     try {
       if (coordinatorClient != null) {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 622451e14..aaa2c1092 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.Constants;
@@ -131,6 +134,13 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     extraProperties.put(
         ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, 
String.valueOf(assignmentShuffleNodesNum));
 
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    List<String> excludeProperties =
+        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+    rssConf.getAll().stream()
+        .filter(entry -> !excludeProperties.contains(entry.getKey()))
+        .forEach(entry -> extraProperties.put(entry.getKey(), (String) 
entry.getValue()));
+
     Set<String> assignmentTags = 
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
     try {
       if (coordinatorClient != null) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 6d311a549..79b41afcc 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -303,4 +303,11 @@ public class RssClientConf {
           .withDescription(
               "The block id manager class of server for this application, "
                   + "the implementation of this interface to manage the 
shuffle block ids");
+
+  public static final ConfigOption<List<String>> 
RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES =
+      ConfigOptions.key("rss.client.reportExcludeProperties")
+          .stringType()
+          .asList()
+          .defaultValues()
+          .withDescription("the report exclude properties could be configured 
by this option");
 }

Reply via email to