This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 204e4e4ff [MINOR] improvement(spark-client): put sparkConf as extra
properties while client request accessCluster (#2254)
204e4e4ff is described below
commit 204e4e4fff380f20c61f2ccbdb43695c3ece6564
Author: maobaolong <[email protected]>
AuthorDate: Tue Nov 26 15:35:41 2024 +0800
[MINOR] improvement(spark-client): put sparkConf as extra properties while
client request accessCluster (#2254)
### What changes were proposed in this pull request?
put sparkConf as extra properties while client request accessCluster
### Why are the changes needed?
Coordinator can let the spark application access rss cluster or not by some
customized access checker leverage some of the spark configs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need
---
.../java/org/apache/spark/shuffle/RssSparkConfig.java | 13 +++++++++++++
.../uniffle/shuffle/manager/RssShuffleManagerBase.java | 17 ++---------------
.../spark/shuffle/DelegationRssShuffleManager.java | 10 ++++++++++
.../spark/shuffle/DelegationRssShuffleManager.java | 10 ++++++++++
.../org/apache/uniffle/common/config/RssClientConf.java | 7 +++++++
5 files changed, 42 insertions(+), 15 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 45950a42e..4a7f653db 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -17,6 +17,8 @@
package org.apache.spark.shuffle;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import scala.Tuple2;
@@ -515,4 +517,15 @@ public class RssSparkConfig {
}
return rssConf;
}
+
+ public static Map<String, String> sparkConfToMap(SparkConf sparkConf) {
+ Map<String, String> map = new HashMap<>();
+
+ for (Tuple2<String, String> tuple : sparkConf.getAll()) {
+ String key = tuple._1;
+ map.put(key, tuple._2);
+ }
+
+ return map;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 8e921c66e..c1b697cbd 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -34,8 +34,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -1064,7 +1062,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
LOG.info("Start to register shuffleId {}", shuffleId);
long start = System.currentTimeMillis();
- Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
+ Map<String, String> sparkConfMap =
RssSparkConfig.sparkConfToMap(getSparkConf());
serverToPartitionRanges.entrySet().stream()
.forEach(
entry -> {
@@ -1095,7 +1093,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
LOG.info("Start to register shuffleId[{}]", shuffleId);
long start = System.currentTimeMillis();
- Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
+ Map<String, String> sparkConfMap =
RssSparkConfig.sparkConfToMap(getSparkConf());
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
serverToPartitionRanges.entrySet();
entries.stream()
@@ -1141,15 +1139,4 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
public SparkConf getSparkConf() {
return sparkConf;
}
-
- public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
- Map<String, String> map = new HashMap<>();
-
- for (Tuple2<String, String> tuple : sparkConf.getAll()) {
- String key = tuple._1;
- map.put(key, tuple._2);
- }
-
- return map;
- }
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 657be54d1..d2cb4038e 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
@@ -127,6 +130,13 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
extraProperties.put(
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM,
String.valueOf(assignmentShuffleNodesNum));
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ List<String> excludeProperties =
+ rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+ rssConf.getAll().stream()
+ .filter(entry -> !excludeProperties.contains(entry.getKey()))
+ .forEach(entry -> extraProperties.put(entry.getKey(), (String)
entry.getValue()));
+
Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
try {
if (coordinatorClient != null) {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 622451e14..aaa2c1092 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
@@ -131,6 +134,13 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
extraProperties.put(
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM,
String.valueOf(assignmentShuffleNodesNum));
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ List<String> excludeProperties =
+ rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+ rssConf.getAll().stream()
+ .filter(entry -> !excludeProperties.contains(entry.getKey()))
+ .forEach(entry -> extraProperties.put(entry.getKey(), (String)
entry.getValue()));
+
Set<String> assignmentTags =
RssSparkShuffleUtils.getAssignmentTags(sparkConf);
try {
if (coordinatorClient != null) {
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 6d311a549..79b41afcc 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -303,4 +303,11 @@ public class RssClientConf {
.withDescription(
"The block id manager class of server for this application, "
+ "the implementation of this interface to manage the
shuffle block ids");
+
+ public static final ConfigOption<List<String>>
RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES =
+ ConfigOptions.key("rss.client.reportExcludeProperties")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription("the report exclude properties could be configured
by this option");
}