This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 199a81b6b768b357e8f0cfaa36a7f6e9f33dbef4 Author: maobaolong <[email protected]> AuthorDate: Tue Nov 26 15:35:41 2024 +0800 [MINOR] improvement(spark-client): put sparkConf as extra properties while client request accessCluster (#2254) ### What changes were proposed in this pull request? put sparkConf as extra properties while client request accessCluster ### Why are the changes needed? Coordinator can let the spark application access rss cluster or not by some customized access checker leverage some of the spark configs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need --- .../java/org/apache/spark/shuffle/RssSparkConfig.java | 13 +++++++++++++ .../uniffle/shuffle/manager/RssShuffleManagerBase.java | 17 ++--------------- .../spark/shuffle/DelegationRssShuffleManager.java | 10 ++++++++++ .../spark/shuffle/DelegationRssShuffleManager.java | 10 ++++++++++ .../org/apache/uniffle/common/config/RssClientConf.java | 7 +++++++ 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 45950a42e..4a7f653db 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -17,6 +17,8 @@ package org.apache.spark.shuffle; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import scala.Tuple2; @@ -515,4 +517,15 @@ public class RssSparkConfig { } return rssConf; } + + public static Map<String, String> sparkConfToMap(SparkConf sparkConf) { + Map<String, String> map = new HashMap<>(); + + for (Tuple2<String, String> tuple : sparkConf.getAll()) { + String key = tuple._1; + map.put(key, tuple._2); + } + + return map; + } } diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index 8e921c66e..c1b697cbd 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -34,8 +34,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import scala.Tuple2; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1064,7 +1062,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac } LOG.info("Start to register shuffleId {}", shuffleId); long start = System.currentTimeMillis(); - Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf()); + Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf()); serverToPartitionRanges.entrySet().stream() .forEach( entry -> { @@ -1095,7 +1093,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac } LOG.info("Start to register shuffleId[{}]", shuffleId); long start = System.currentTimeMillis(); - Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf()); + Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf()); Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries = serverToPartitionRanges.entrySet(); entries.stream() @@ -1141,15 +1139,4 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac public SparkConf getSparkConf() { return sparkConf; } - - public Map<String, String> sparkConfToMap(SparkConf sparkConf) { - Map<String, String> map = new HashMap<>(); - - for (Tuple2<String, String> tuple : sparkConf.getAll()) { - String key = tuple._1; - map.put(key, tuple._2); - } - - return map; - } } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 657be54d1..d2cb4038e 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; @@ -127,6 +130,13 @@ public class DelegationRssShuffleManager implements ShuffleManager { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); + RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); + List<String> excludeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + rssConf.getAll().stream() + .filter(entry -> !excludeProperties.contains(entry.getKey())) + .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); + Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); try { if (coordinatorClient != null) { diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index 622451e14..aaa2c1092 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; @@ -131,6 +134,13 @@ public class DelegationRssShuffleManager implements ShuffleManager { extraProperties.put( ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum)); + RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); + List<String> excludeProperties = + rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES); + rssConf.getAll().stream() + .filter(entry -> !excludeProperties.contains(entry.getKey())) + .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue())); + Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); try { if (coordinatorClient != null) { diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 6d311a549..79b41afcc 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -303,4 +303,11 @@ public class RssClientConf { .withDescription( "The block id manager class of server for this application, " + "the implementation of this interface to manage the shuffle block ids"); + + public static final ConfigOption<List<String>> RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES = + ConfigOptions.key("rss.client.reportExcludeProperties") + .stringType() + .asList() + .defaultValues() + .withDescription("the report exclude properties could be configured by this option"); }
