This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 7fd296a66dab36b546f8cbc4e878d767b44bcb9b Author: maobaolong <[email protected]> AuthorDate: Mon Nov 18 20:13:25 2024 +0800 [MINOR] feat(spark-client): Support set accessId by another config dynamically (#2250) ### What changes were proposed in this pull request? Support set accessId by another config dynamically. ### Why are the changes needed? Without this PR, we have to set accessId for each user belongs to `spark.yarn.queue`, after patch this feature, the `spark.access.id` set to the value of `spark.yarn.queue`, if we set `spark.rss.access.id.provider.key=spark.yarn.queue` ### Does this PR introduce _any_ user-facing change? Introduce spark new config `spark.rss.access.id.provider.key`. ### How was this patch tested? Updated UT. --- .../main/java/org/apache/spark/shuffle/RssSparkConfig.java | 3 +++ .../apache/spark/shuffle/DelegationRssShuffleManager.java | 7 +++++-- .../apache/spark/shuffle/DelegationRssShuffleManager.java | 7 +++++++ .../spark/shuffle/DelegationRssShuffleManagerTest.java | 12 ++++++++++++ docs/client_guide/spark_client_guide.md | 2 ++ 5 files changed, 29 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 734dedccc..45950a42e 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -322,6 +322,9 @@ public class RssSparkConfig { public static final ConfigEntry<String> RSS_ACCESS_ID = createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); + public static final ConfigEntry<String> RSS_ACCESS_ID_PROVIDER_KEY = + createStringBuilder(new ConfigBuilder("spark.rss.access.id.providerKey")) + .createWithDefault(""); public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS = createIntegerBuilder( diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index af9295e55..657be54d1 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -112,8 +112,11 @@ public class DelegationRssShuffleManager implements ShuffleManager { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); if (StringUtils.isEmpty(accessId)) { - LOG.warn("Access id key is empty"); - return false; + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } } long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java index bb8ed3a90..622451e14 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java @@ -111,6 +111,13 @@ public class DelegationRssShuffleManager implements ShuffleManager { private boolean tryAccessCluster() { String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim(); + if (StringUtils.isEmpty(accessId)) { + String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), ""); + if (StringUtils.isNotEmpty(accessId)) { + accessId = sparkConf.get(providerKey, ""); + LOG.info("Get access id {} from provider key: {}", accessId, providerKey); + } + } if (StringUtils.isEmpty(accessId)) { LOG.warn("Access id key is empty"); return false; diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java index c7011f27e..092869e31 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java @@ -47,15 +47,27 @@ public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase { SparkConf conf = new SparkConf(); assertCreateSortShuffleManager(conf); + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set("spark.foo.bar.key", "mockId"); + conf.set(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "spark.foo.bar.key"); + assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); assertCreateSortShuffleManager(conf); + + conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); + conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name()); conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true); assertCreateRssShuffleManager(conf); conf = new SparkConf(); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false"); conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002"); assertCreateSortShuffleManager(conf); } diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index 3e504b51e..c189c064d 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -87,6 +87,8 @@ The important configuration is listed as following. | spark.rss.client.off.heap.memory.enable | false | The client use off heap memory to process data | | spark.rss.client.remote.storage.useLocalConfAsDefault | false | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath | | spark.rss.hadoop.* | - | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage | +| spark.rss.access.id | - | The access id for request access rss cluster. This is used for DelegationRssShuffleManager | +| spark.rss.access.id.providerKey | - | Get access id from the value of the given provider key. This is used for DelegationRssShuffleManager | ### Block id bits
