This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a36261296 [#2190] feat(client): Add support about setting extra system
properties in client side (#2191)
a36261296 is described below
commit a36261296b05d72e4a774d9c9555cc12b922be97
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Oct 16 13:36:55 2024 +0800
[#2190] feat(client): Add support about setting extra system properties in
client side (#2191)
### What changes were proposed in this pull request?
Add support about setting extra system properties
### Why are the changes needed?
For: #2190
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Unit test
Co-authored-by: Junfan Zhang <[email protected]>
---
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 1 +
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 1 +
.../org/apache/uniffle/common/config/RssClientConf.java | 9 +++++++++
.../java/org/apache/uniffle/common/util/RssUtils.java | 16 ++++++++++++++++
.../org/apache/uniffle/common/util/RssUtilsTest.java | 13 +++++++++++++
5 files changed, 40 insertions(+)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 68f3bd4b7..c21cd0b56 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -128,6 +128,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
this.speculation = sparkConf.getBoolean("spark.speculation", false);
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ RssUtils.setExtraJavaProperties(rssConf);
// configureBlockIdLayout requires maxFailures and speculation to be
initialized
configureBlockIdLayout(sparkConf, rssConf);
this.blockIdLayout = BlockIdLayout.from(rssConf);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d7725113f..eb724484d 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -162,6 +162,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dataDistributionType = getDataDistributionType(sparkConf);
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ RssUtils.setExtraJavaProperties(rssConf);
this.maxConcurrencyPerPartitionToWrite =
rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
this.speculation = sparkConf.getBoolean("spark.speculation", false);
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index c0ade1073..58b6d6a91 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.common.config;
+import java.util.List;
+
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.compression.Codec;
@@ -286,4 +288,11 @@ public class RssClientConf {
.intType()
.defaultValue(500)
.withDescription("the max size of records per buffer when fetch
remote merge records");
+
+ public static final ConfigOption<List<String>>
RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES =
+ ConfigOptions.key("rss.client.extraJavaSystemProperties")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("the extra java properties could be configured by
this option");
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 121fe10de..eb0bc4a4c 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -49,6 +49,7 @@ import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.channel.unix.Errors;
import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -60,6 +61,8 @@ import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.ServerInterface;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES;
+
public class RssUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(RssUtils.class);
@@ -428,4 +431,17 @@ public class RssUtils {
Constructor<?> constructor = klass.getConstructor(parameterTypes);
return constructor;
}
+
+ public static void setExtraJavaProperties(RssConf conf) {
+ List<String> properties =
conf.get(RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES);
+ if (CollectionUtils.isEmpty(properties)) {
+ return;
+ }
+
+ for (String propertyKv : properties) {
+ LOGGER.info("Setting system property: {}", propertyKv);
+ String[] raw = propertyKv.split("=", 2);
+ System.setProperty(raw[0], raw[1]);
+ }
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index d9bfddb72..1d2bed8c6 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -47,6 +47,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerInterface;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -280,6 +281,18 @@ public class RssUtilsTest {
assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
}
+ @Test
+ public void testSettingProperties() {
+ RssConf conf = new RssConf();
+ conf.set(RSS_CLIENT_EXTRA_JAVA_SYSTEM_PROPERTIES, Arrays.asList("k1=v1",
"k2=v2"));
+
+ assertNull(System.getProperty("k1"));
+ assertNull(System.getProperty("k2"));
+ RssUtils.setExtraJavaProperties(conf);
+ assertEquals("v1", System.getProperty("k1"));
+ assertEquals("v2", System.getProperty("k2"));
+ }
+
@Test
public void testGetConfiguredLocalDirs() throws Exception {
RssConf conf = new RssConf();