This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fd27a0834 [#2254][FOLLOWUP] improvement(spark):Add include key filter
before report extraProperties (#2265)
fd27a0834 is described below
commit fd27a0834713dc5c3825560347fd70010ab71d2e
Author: maobaolong <[email protected]>
AuthorDate: Sat Jan 4 14:04:47 2025 +0800
[#2254][FOLLOWUP] improvement(spark):Add include key filter before report
extraProperties (#2265)
### What changes were proposed in this pull request?
- Add include key filter before report extraProperties
- Related docs
### Why are the changes needed?
- Add include key filter
- Add documents.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested on our test cluster.
- Client conf
```
--conf
spark.shuffle.manager=org.apache.spark.shuffle.DelegationRssShuffleManager
--conf spark.rss.access.id="access.id"
--conf
spark.rss.client.reportIncludeProperties="spark.test.key,test2.key,app.id,yarn.queue"
--conf spark.rss.client.reportExcludeProperties="app.id"
```
- Coordinator_rpc_audit.log
```
[2024-11-26 21:14:45.632] cmd=accessCluster statusCode=ACCESS_DENIED
from=/XXX:34733 executionTimeUs=276(<1s) appId=N/A
args{accessInfo=AccessInfo{accessId='rss-test-mbl-003-bannedid', user=
tdwadmin, tags=[ss_v5],
extraProperties={access_info_required_shuffle_nodes_num=-1,
yarn.queue=g_teg_tdw_operlog-offline, spark.test.key=123_456}}}
```
---
.../spark/shuffle/DelegationRssShuffleManager.java | 3 +
.../spark/shuffle/DelegationRssShuffleManager.java | 3 +
.../shuffle/DelegationRssShuffleManagerTest.java | 86 ++++++++++++++++++++++
.../spark/shuffle/RssShuffleManagerTestBase.java | 3 +-
.../uniffle/common/config/RssClientConf.java | 7 ++
docs/client_guide/client_guide.md | 2 +
6 files changed, 103 insertions(+), 1 deletion(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index d2cb4038e..7f2af3ef2 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -133,7 +133,10 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
List<String> excludeProperties =
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+ List<String> includeProperties =
+ rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
rssConf.getAll().stream()
+ .filter(entry -> includeProperties == null ||
includeProperties.contains(entry.getKey()))
.filter(entry -> !excludeProperties.contains(entry.getKey()))
.forEach(entry -> extraProperties.put(entry.getKey(), (String)
entry.getValue()));
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index dfdf587c3..593c8a29a 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -137,7 +137,10 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
List<String> excludeProperties =
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+ List<String> includeProperties =
+ rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
rssConf.getAll().stream()
+ .filter(entry -> includeProperties == null ||
includeProperties.contains(entry.getKey()))
.filter(entry -> !excludeProperties.contains(entry.getKey()))
.forEach(entry -> extraProperties.put(entry.getKey(), (String)
entry.getValue()));
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 092869e31..a38bfad7b 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -22,7 +22,11 @@ import java.util.NoSuchElementException;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.apache.uniffle.client.api.CoordinatorClient;
+import org.apache.uniffle.client.request.RssAccessClusterRequest;
+import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
@@ -30,6 +34,7 @@ import static
org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase
{
@@ -131,6 +136,87 @@ public class DelegationRssShuffleManagerTest extends
RssShuffleManagerTestBase {
assertCreateSortShuffleManager(secondConf);
}
+ @Test
+ public void testDefaultIncludeExcludeProperties() throws Exception {
+ final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+ SparkConf conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+ final int confInitKeyCount = conf.getAll().length;
+ assertCreateRssShuffleManager(conf);
+
+ // default case: access cluster should include all properties in conf and
an extra one.
+ ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+ ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+ verify(mockClient).accessCluster(argumentCaptor.capture());
+ RssAccessClusterRequest request = argumentCaptor.getValue();
+ assertEquals(confInitKeyCount + 1, request.getExtraProperties().size());
+ }
+
+ @Test
+ public void testIncludeProperties() throws Exception {
+ final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+ SparkConf conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+ // test include properties
+ conf.set(
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ + RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(),
+ RssSparkConfig.RSS_ACCESS_ID
+ .key()
+ .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
+ assertCreateRssShuffleManager(conf);
+
+ ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+ ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+
+ verify(mockClient).accessCluster(argumentCaptor.capture());
+ RssAccessClusterRequest request = argumentCaptor.getValue();
+ // only accessId and extra one
+ assertEquals(1 + 1, request.getExtraProperties().size());
+ }
+
+ @Test
+ public void testExcludeProperties() throws Exception {
+ final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+ SparkConf conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+ // test exclude properties
+ conf.set(
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ + RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(),
+ RssSparkConfig.RSS_ACCESS_ID
+ .key()
+ .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
+ final int confInitKeyCount = conf.getAll().length;
+ assertCreateRssShuffleManager(conf);
+
+ ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+ ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+
+ verify(mockClient).accessCluster(argumentCaptor.capture());
+ RssAccessClusterRequest request = argumentCaptor.getValue();
+ // all accessId and extra one except the excluded one
+ assertEquals(confInitKeyCount + 1 - 1,
request.getExtraProperties().size());
+ }
+
private void assertCreateSortShuffleManager(SparkConf conf) throws Exception
{
DelegationRssShuffleManager delegationRssShuffleManager =
new DelegationRssShuffleManager(conf, true);
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
index 09cf24cf8..4b7cb6202 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
@@ -56,7 +56,7 @@ public class RssShuffleManagerTestBase {
return mockedCoordinatorClient;
}
- void setupMockedRssShuffleUtils(StatusCode status) {
+ CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) {
CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status);
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
coordinatorClients.add(mockCoordinatorClient);
@@ -65,5 +65,6 @@ public class RssShuffleManagerTestBase {
mockedStaticRssShuffleUtils
.when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
.thenReturn(client);
+ return mockCoordinatorClient;
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 79b41afcc..36d80b6e5 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -310,4 +310,11 @@ public class RssClientConf {
.asList()
.defaultValues()
.withDescription("the report exclude properties could be configured
by this option");
+
+ public static final ConfigOption<List<String>>
RSS_CLIENT_REPORT_INCLUDE_PROPERTIES =
+ ConfigOptions.key("rss.client.reportIncludeProperties")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("the report include properties could be configured
by this option");
}
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 371528931..4092df454 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -60,6 +60,8 @@ The important configuration of client is listed as following.
These configuratio
| <client_type>.rss.client.rpc.netty.maxOrder | 3
| The value of maxOrder for PooledByteBufAllocator
when using gRPC internal Netty on the client-side. This configuration will only
take effect when rss.rpc.server.type is set to GRPC_NETTY.
[...]
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024
| The value of smallCacheSize for
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This
configuration will only take effect when rss.rpc.server.type is set to
GRPC_NETTY.
[...]
| <client_type>.rss.client.blockIdManagerClass | -
| The block id manager class of server for this
application, the implementation of this interface to manage the shuffle block
ids
[...]
+| <client_type>.rss.client.reportExcludeProperties | -
| The value of exclude properties specify a list of
client configuration properties that should not be reported to the coordinator
by the DelegationRssShuffleManager.
[...]
+| <client_type>.rss.client.reportIncludeProperties | -
| The value of include properties specify a list of
client configuration properties that should be exclusively reported to the
coordinator by the DelegationRssShuffleManager.
[...]
Notice: