This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fd27a0834 [#2254][FOLLOWUP] improvement(spark):Add include key filter 
before report extraProperties (#2265)
fd27a0834 is described below

commit fd27a0834713dc5c3825560347fd70010ab71d2e
Author: maobaolong <[email protected]>
AuthorDate: Sat Jan 4 14:04:47 2025 +0800

    [#2254][FOLLOWUP] improvement(spark):Add include key filter before report 
extraProperties (#2265)
    
    ### What changes were proposed in this pull request?
    
    - Add include key filter before report extraProperties
    - Related docs
    
    ### Why are the changes needed?
    
    - Add include key filter
    - Add documents.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested on our test cluster.
    
    - Client conf
    ```
    --conf 
spark.shuffle.manager=org.apache.spark.shuffle.DelegationRssShuffleManager 
--conf spark.rss.access.id="access.id"
    --conf 
spark.rss.client.reportIncludeProperties="spark.test.key,test2.key,app.id,yarn.queue"
 --conf spark.rss.client.reportExcludeProperties="app.id"
    ```
    
    - Coordinator_rpc_audit.log
    
    ```
    [2024-11-26 21:14:45.632] cmd=accessCluster     statusCode=ACCESS_DENIED    
    from=/XXX:34733 executionTimeUs=276(<1s)        appId=N/A       
args{accessInfo=AccessInfo{accessId='rss-test-mbl-003-bannedid', user= 
tdwadmin, tags=[ss_v5], 
extraProperties={access_info_required_shuffle_nodes_num=-1, 
yarn.queue=g_teg_tdw_operlog-offline, spark.test.key=123_456}}}
    ```
---
 .../spark/shuffle/DelegationRssShuffleManager.java |  3 +
 .../spark/shuffle/DelegationRssShuffleManager.java |  3 +
 .../shuffle/DelegationRssShuffleManagerTest.java   | 86 ++++++++++++++++++++++
 .../spark/shuffle/RssShuffleManagerTestBase.java   |  3 +-
 .../uniffle/common/config/RssClientConf.java       |  7 ++
 docs/client_guide/client_guide.md                  |  2 +
 6 files changed, 103 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index d2cb4038e..7f2af3ef2 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -133,7 +133,10 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     List<String> excludeProperties =
         rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+    List<String> includeProperties =
+        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
     rssConf.getAll().stream()
+        .filter(entry -> includeProperties == null || 
includeProperties.contains(entry.getKey()))
         .filter(entry -> !excludeProperties.contains(entry.getKey()))
         .forEach(entry -> extraProperties.put(entry.getKey(), (String) 
entry.getValue()));
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index dfdf587c3..593c8a29a 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -137,7 +137,10 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     List<String> excludeProperties =
         rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
+    List<String> includeProperties =
+        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
     rssConf.getAll().stream()
+        .filter(entry -> includeProperties == null || 
includeProperties.contains(entry.getKey()))
         .filter(entry -> !excludeProperties.contains(entry.getKey()))
         .forEach(entry -> extraProperties.put(entry.getKey(), (String) 
entry.getValue()));
 
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 092869e31..a38bfad7b 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -22,7 +22,11 @@ import java.util.NoSuchElementException;
 import org.apache.spark.SparkConf;
 import org.apache.spark.shuffle.sort.SortShuffleManager;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
+import org.apache.uniffle.client.api.CoordinatorClient;
+import org.apache.uniffle.client.request.RssAccessClusterRequest;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.apache.uniffle.common.rpc.StatusCode.ACCESS_DENIED;
@@ -30,6 +34,7 @@ import static 
org.apache.uniffle.common.rpc.StatusCode.SUCCESS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
 
 public class DelegationRssShuffleManagerTest extends RssShuffleManagerTestBase 
{
 
@@ -131,6 +136,87 @@ public class DelegationRssShuffleManagerTest extends 
RssShuffleManagerTestBase {
     assertCreateSortShuffleManager(secondConf);
   }
 
+  @Test
+  public void testDefaultIncludeExcludeProperties() throws Exception {
+    final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+    final int confInitKeyCount = conf.getAll().length;
+    assertCreateRssShuffleManager(conf);
+
+    // default case: access cluster should include all properties in conf and 
an extra one.
+    ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+        ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+    verify(mockClient).accessCluster(argumentCaptor.capture());
+    RssAccessClusterRequest request = argumentCaptor.getValue();
+    assertEquals(confInitKeyCount + 1, request.getExtraProperties().size());
+  }
+
+  @Test
+  public void testIncludeProperties() throws Exception {
+    final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+    // test include properties
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+            + RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES.key(),
+        RssSparkConfig.RSS_ACCESS_ID
+            .key()
+            .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
+    assertCreateRssShuffleManager(conf);
+
+    ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+        ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+
+    verify(mockClient).accessCluster(argumentCaptor.capture());
+    RssAccessClusterRequest request = argumentCaptor.getValue();
+    // only accessId and extra one
+    assertEquals(1 + 1, request.getExtraProperties().size());
+  }
+
+  @Test
+  public void testExcludeProperties() throws Exception {
+    final CoordinatorClient mockClient = setupMockedRssShuffleUtils(SUCCESS);
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+    // test exclude properties
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+            + RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES.key(),
+        RssSparkConfig.RSS_ACCESS_ID
+            .key()
+            .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()));
+    final int confInitKeyCount = conf.getAll().length;
+    assertCreateRssShuffleManager(conf);
+
+    ArgumentCaptor<RssAccessClusterRequest> argumentCaptor =
+        ArgumentCaptor.forClass(RssAccessClusterRequest.class);
+
+    verify(mockClient).accessCluster(argumentCaptor.capture());
+    RssAccessClusterRequest request = argumentCaptor.getValue();
+    // all accessId and extra one except the excluded one
+    assertEquals(confInitKeyCount + 1 - 1, 
request.getExtraProperties().size());
+  }
+
   private void assertCreateSortShuffleManager(SparkConf conf) throws Exception 
{
     DelegationRssShuffleManager delegationRssShuffleManager =
         new DelegationRssShuffleManager(conf, true);
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
index 09cf24cf8..4b7cb6202 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
@@ -56,7 +56,7 @@ public class RssShuffleManagerTestBase {
     return mockedCoordinatorClient;
   }
 
-  void setupMockedRssShuffleUtils(StatusCode status) {
+  CoordinatorClient setupMockedRssShuffleUtils(StatusCode status) {
     CoordinatorClient mockCoordinatorClient = createCoordinatorClient(status);
     List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
     coordinatorClients.add(mockCoordinatorClient);
@@ -65,5 +65,6 @@ public class RssShuffleManagerTestBase {
     mockedStaticRssShuffleUtils
         .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
         .thenReturn(client);
+    return mockCoordinatorClient;
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 79b41afcc..36d80b6e5 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -310,4 +310,11 @@ public class RssClientConf {
           .asList()
           .defaultValues()
           .withDescription("the report exclude properties could be configured 
by this option");
+
+  public static final ConfigOption<List<String>> 
RSS_CLIENT_REPORT_INCLUDE_PROPERTIES =
+      ConfigOptions.key("rss.client.reportIncludeProperties")
+          .stringType()
+          .asList()
+          .noDefaultValue()
+          .withDescription("the report include properties could be configured 
by this option");
 }
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index 371528931..4092df454 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -60,6 +60,8 @@ The important configuration of client is listed as following. 
These configuratio
 | <client_type>.rss.client.rpc.netty.maxOrder                     | 3          
                            | The value of maxOrder for PooledByteBufAllocator 
when using gRPC internal Netty on the client-side. This configuration will only 
take effect when rss.rpc.server.type is set to GRPC_NETTY.                      
                                                                                
                                                                                
               [...]
 | <client_type>.rss.client.rpc.netty.smallCacheSize               | 1024       
                            | The value of smallCacheSize for 
PooledByteBufAllocator when using gRPC internal Netty on the client-side. This 
configuration will only take effect when rss.rpc.server.type is set to 
GRPC_NETTY.                                                                     
                                                                                
                                          [...]
 | <client_type>.rss.client.blockIdManagerClass                    | -          
                            | The block id manager class of server for this 
application, the implementation of this interface to manage the shuffle block 
ids                                                                             
                                                                                
                                                                                
                    [...]
+| <client_type>.rss.client.reportExcludeProperties                | -          
                            | The value of exclude properties specify a list of 
client configuration properties that should not be reported to the coordinator 
by the DelegationRssShuffleManager.                                             
                                                                                
                                                                                
               [...]
+| <client_type>.rss.client.reportIncludeProperties                | -          
                            | The value of include properties specify a list of 
client configuration properties that should be exclusively reported to the 
coordinator by the DelegationRssShuffleManager.                                 
                                                                                
                                                                                
                   [...]
 
 Notice:
 

Reply via email to