This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b00b67c38 [MINOR] feat(spark-client): Support set accessId by another 
config dynamically (#2250)
b00b67c38 is described below

commit b00b67c388a5f658a61a10768574b4792dcb3e7d
Author: maobaolong <[email protected]>
AuthorDate: Mon Nov 18 20:13:25 2024 +0800

    [MINOR] feat(spark-client): Support set accessId by another config 
dynamically (#2250)
    
    ### What changes were proposed in this pull request?
    
    Support set accessId by another config dynamically.
    
    ### Why are the changes needed?
    
    Without this PR, we have to set accessId for each user belongs to 
`spark.yarn.queue`, after patch this feature, the `spark.access.id` set to the 
value of `spark.yarn.queue`, if we set 
`spark.rss.access.id.provider.key=spark.yarn.queue`
    
    ### Does this PR introduce _any_ user-facing change?
    
    Introduce  spark new config `spark.rss.access.id.provider.key`.
    
    ### How was this patch tested?
    
    Updated UT.
---
 .../main/java/org/apache/spark/shuffle/RssSparkConfig.java   |  3 +++
 .../apache/spark/shuffle/DelegationRssShuffleManager.java    |  7 +++++--
 .../apache/spark/shuffle/DelegationRssShuffleManager.java    |  7 +++++++
 .../spark/shuffle/DelegationRssShuffleManagerTest.java       | 12 ++++++++++++
 docs/client_guide/spark_client_guide.md                      |  2 ++
 5 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 734dedccc..45950a42e 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -322,6 +322,9 @@ public class RssSparkConfig {
 
   public static final ConfigEntry<String> RSS_ACCESS_ID =
       createStringBuilder(new 
ConfigBuilder("spark.rss.access.id")).createWithDefault("");
+  public static final ConfigEntry<String> RSS_ACCESS_ID_PROVIDER_KEY =
+      createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey"))
+          .createWithDefault("");
 
   public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS =
       createIntegerBuilder(
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index af9295e55..657be54d1 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -112,8 +112,11 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   private boolean tryAccessCluster() {
     String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
     if (StringUtils.isEmpty(accessId)) {
-      LOG.warn("Access id key is empty");
-      return false;
+      String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"");
+      if (StringUtils.isNotEmpty(accessId)) {
+        accessId = sparkConf.get(providerKey, "");
+        LOG.info("Get access id {} from provider key: {}", accessId, 
providerKey);
+      }
     }
     long retryInterval = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
     int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index bb8ed3a90..622451e14 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -111,6 +111,13 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
 
   private boolean tryAccessCluster() {
     String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
+    if (StringUtils.isEmpty(accessId)) {
+      String providerKey = 
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
+      if (StringUtils.isNotEmpty(accessId)) {
+        accessId = sparkConf.get(providerKey, "");
+        LOG.info("Get access id {} from provider key: {}", accessId, 
providerKey);
+      }
+    }
     if (StringUtils.isEmpty(accessId)) {
       LOG.warn("Access id key is empty");
       return false;
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index c7011f27e..092869e31 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -47,15 +47,27 @@ public class DelegationRssShuffleManagerTest extends 
RssShuffleManagerTestBase {
 
     SparkConf conf = new SparkConf();
     assertCreateSortShuffleManager(conf);
+    conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set("spark.foo.bar.key", "mockId");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), 
"spark.foo.bar.key");
+    assertCreateSortShuffleManager(conf);
+
+    conf = new SparkConf();
     conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
     conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
     assertCreateSortShuffleManager(conf);
+
+    conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
     conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
     conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
     conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
     assertCreateRssShuffleManager(conf);
 
     conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
     conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
     assertCreateSortShuffleManager(conf);
   }
diff --git a/docs/client_guide/spark_client_guide.md 
b/docs/client_guide/spark_client_guide.md
index 3e504b51e..c189c064d 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -87,6 +87,8 @@ The important configuration is listed as following.
 | spark.rss.client.off.heap.memory.enable               | false   | The client 
use off heap memory to process data                                             
                                                                                
                                                                                
                                                                                
                                                           |
 | spark.rss.client.remote.storage.useLocalConfAsDefault | false   | This 
option is only valid when the remote storage path is specified. If ture, the 
remote storage conf will use the client side hadoop configuration loaded from 
the classpath                                                                   
                                                                                
                                                                      |
 | spark.rss.hadoop.*                                    | -       | The prefix 
key for Hadoop conf. For Spark like that: 
`spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as 
`fs.defaultFS=hdfs://rbf-x1` for Hadoop storage                                 
                                                                                
                                                                                
                                  |
+| spark.rss.access.id                                   | -       | The access 
id for request access rss cluster. This is used for DelegationRssShuffleManager 
                                                                                
                                                                                
                                                                                
                                                           |
+| spark.rss.access.id.providerKey                       | -       | Get access 
id from the value of the given provider key. This is used for 
DelegationRssShuffleManager                                                     
                                                                                
                                                                                
                                                                             |
 
 ### Block id bits
 

Reply via email to