This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a36261296 [#2190] feat(client): Add support about setting extra system 
properties in client side (#2191)
a36261296 is described below

commit a36261296b05d72e4a774d9c9555cc12b922be97
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Oct 16 13:36:55 2024 +0800

    [#2190] feat(client): Add support about setting extra system properties in 
client side (#2191)
    
    ### What changes were proposed in this pull request?
    
    Add support about setting extra system properties
    
    ### Why are the changes needed?
    
    For: #2190
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    
    ### How was this patch tested?
    
    Unit test
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../java/org/apache/spark/shuffle/RssShuffleManager.java |  1 +
 .../java/org/apache/spark/shuffle/RssShuffleManager.java |  1 +
 .../org/apache/uniffle/common/config/RssClientConf.java  |  9 +++++++++
 .../java/org/apache/uniffle/common/util/RssUtils.java    | 16 ++++++++++++++++
 .../org/apache/uniffle/common/util/RssUtilsTest.java     | 13 +++++++++++++
 5 files changed, 40 insertions(+)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 68f3bd4b7..c21cd0b56 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -128,6 +128,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
     RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    RssUtils.setExtraJavaProperties(rssConf);
     // configureBlockIdLayout requires maxFailures and speculation to be 
initialized
     configureBlockIdLayout(sparkConf, rssConf);
     this.blockIdLayout = BlockIdLayout.from(rssConf);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d7725113f..eb724484d 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -162,6 +162,7 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dataDistributionType = getDataDistributionType(sparkConf);
     RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    RssUtils.setExtraJavaProperties(rssConf);
     this.maxConcurrencyPerPartitionToWrite = 
rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index c0ade1073..58b6d6a91 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.common.config;
 
+import java.util.List;
+
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.compression.Codec;
@@ -286,4 +288,11 @@ public class RssClientConf {
           .intType()
           .defaultValue(500)
           .withDescription("the max size of records per buffer when fetch 
remote merge records");
+
+  public static final ConfigOption<List<String>> 
RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES =
+      ConfigOptions.key("rss.client.extraJavaSystemProperties")
+          .stringType()
+          .asList()
+          .noDefaultValue()
+          .withDescription("the extra java properties could be configured by 
this option");
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 121fe10de..eb0bc4a4c 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -49,6 +49,7 @@ import com.google.common.collect.Sets;
 import com.google.common.net.InetAddresses;
 import io.netty.channel.unix.Errors;
 import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.collections4.CollectionUtils;
 import org.eclipse.jetty.util.MultiException;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -60,6 +61,8 @@ import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.ServerInterface;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES;
+
 public class RssUtils {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RssUtils.class);
@@ -428,4 +431,17 @@ public class RssUtils {
     Constructor<?> constructor = klass.getConstructor(parameterTypes);
     return constructor;
   }
+
+  public static void setExtraJavaProperties(RssConf conf) {
+    List<String> properties = 
conf.get(RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES);
+    if (CollectionUtils.isEmpty(properties)) {
+      return;
+    }
+
+    for (String propertyKv : properties) {
+      LOGGER.info("Setting system property: {}", propertyKv);
+      String[] raw = propertyKv.split("=", 2);
+      System.setProperty(raw[0], raw[1]);
+    }
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index d9bfddb72..1d2bed8c6 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -47,6 +47,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.rpc.ServerInterface;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -280,6 +281,18 @@ public class RssUtilsTest {
     assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
   }
 
+  @Test
+  public void testSettingProperties() {
+    RssConf conf = new RssConf();
+    conf.set(RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES, Arrays.asList("k1=v1", 
"k2=v2"));
+
+    assertNull(System.getProperty("k1"));
+    assertNull(System.getProperty("k2"));
+    RssUtils.setExtraJavaProperties(conf);
+    assertEquals("v1", System.getProperty("k1"));
+    assertEquals("v2", System.getProperty("k2"));
+  }
+
   @Test
   public void testGetConfiguredLocalDirs() throws Exception {
     RssConf conf = new RssConf();

Reply via email to