This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b00b67c38 [MINOR] feat(spark-client): Support set accessId by another
config dynamically (#2250)
b00b67c38 is described below
commit b00b67c388a5f658a61a10768574b4792dcb3e7d
Author: maobaolong <[email protected]>
AuthorDate: Mon Nov 18 20:13:25 2024 +0800
[MINOR] feat(spark-client): Support set accessId by another config
dynamically (#2250)
### What changes were proposed in this pull request?
Support set accessId by another config dynamically.
### Why are the changes needed?
Without this PR, we have to set accessId for each user belongs to
`spark.yarn.queue`, after patch this feature, the `spark.access.id` set to the
value of `spark.yarn.queue`, if we set
`spark.rss.access.id.provider.key=spark.yarn.queue`
### Does this PR introduce _any_ user-facing change?
Introduce spark new config `spark.rss.access.id.provider.key`.
### How was this patch tested?
Updated UT.
---
.../main/java/org/apache/spark/shuffle/RssSparkConfig.java | 3 +++
.../apache/spark/shuffle/DelegationRssShuffleManager.java | 7 +++++--
.../apache/spark/shuffle/DelegationRssShuffleManager.java | 7 +++++++
.../spark/shuffle/DelegationRssShuffleManagerTest.java | 12 ++++++++++++
docs/client_guide/spark_client_guide.md | 2 ++
5 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 734dedccc..45950a42e 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -322,6 +322,9 @@ public class RssSparkConfig {
public static final ConfigEntry<String> RSS_ACCESS_ID =
createStringBuilder(new
ConfigBuilder("spark.rss.access.id")).createWithDefault("");
+ public static final ConfigEntry<String> RSS_ACCESS_ID_PROVIDER_KEY =
+ createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey"))
+ .createWithDefault("");
public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS =
createIntegerBuilder(
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index af9295e55..657be54d1 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -112,8 +112,11 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
if (StringUtils.isEmpty(accessId)) {
- LOG.warn("Access id key is empty");
- return false;
+ String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"");
+ if (StringUtils.isNotEmpty(accessId)) {
+ accessId = sparkConf.get(providerKey, "");
+ LOG.info("Get access id {} from provider key: {}", accessId,
providerKey);
+ }
}
long retryInterval =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
int retryTimes =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index bb8ed3a90..622451e14 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -111,6 +111,13 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private boolean tryAccessCluster() {
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
+ if (StringUtils.isEmpty(accessId)) {
+ String providerKey =
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
+ if (StringUtils.isNotEmpty(accessId)) {
+ accessId = sparkConf.get(providerKey, "");
+ LOG.info("Get access id {} from provider key: {}", accessId,
providerKey);
+ }
+ }
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index c7011f27e..092869e31 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -47,15 +47,27 @@ public class DelegationRssShuffleManagerTest extends
RssShuffleManagerTestBase {
SparkConf conf = new SparkConf();
assertCreateSortShuffleManager(conf);
+ conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set("spark.foo.bar.key", "mockId");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(),
"spark.foo.bar.key");
+ assertCreateSortShuffleManager(conf);
+
+ conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
assertCreateSortShuffleManager(conf);
+
+ conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);
conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
assertCreateSortShuffleManager(conf);
}
diff --git a/docs/client_guide/spark_client_guide.md
b/docs/client_guide/spark_client_guide.md
index 3e504b51e..c189c064d 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -87,6 +87,8 @@ The important configuration is listed as following.
| spark.rss.client.off.heap.memory.enable | false | The client
use off heap memory to process data
|
| spark.rss.client.remote.storage.useLocalConfAsDefault | false | This
option is only valid when the remote storage path is specified. If ture, the
remote storage conf will use the client side hadoop configuration loaded from
the classpath
|
| spark.rss.hadoop.* | - | The prefix
key for Hadoop conf. For Spark like that:
`spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as
`fs.defaultFS=hdfs://rbf-x1` for Hadoop storage
|
+| spark.rss.access.id | - | The access
id for request access rss cluster. This is used for DelegationRssShuffleManager
|
+| spark.rss.access.id.providerKey | - | Get access
id from the value of the given provider key. This is used for
DelegationRssShuffleManager
|
### Block id bits