This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new f5261d3f9 [#2003][FOLLOWUP] fix(client): Replace rss.rpc.client.type.grpc.timeout with rss.client.rpc.timeout.ms in RssShuffleManagerBase (#2004) f5261d3f9 is described below commit f5261d3f902aa23aabd0fc5cdeb3c35c9bdeb4d5 Author: xumanbu <jam...@vipshop.com> AuthorDate: Fri Aug 2 20:29:16 2024 +0800 [#2003][FOLLOWUP] fix(client): Replace rss.rpc.client.type.grpc.timeout with rss.client.rpc.timeout.ms in RssShuffleManagerBase (#2004) ### What changes were proposed in this pull request? 1. Configure `rpcTimeout` in ShuffleManagerClient using `rss.client.rpc.timeout.ms`. 2. Remove the `rss.rpc.client.type.grpc.timeout` configuration. ### Why are the changes needed? Fix: #2003 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT --- .../org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java | 3 +-- .../main/java/org/apache/uniffle/common/config/RssBaseConf.java | 8 -------- .../main/java/org/apache/uniffle/common/config/RssClientConf.java | 3 ++- docs/client_guide/client_guide.md | 2 +- .../org/apache/uniffle/test/ShuffleServerManagerTestBase.java | 4 ++-- 5 files changed, 6 insertions(+), 14 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index d314b9bb6..5d5fcdfa2 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -79,7 +79,6 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.ConfigOption; -import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; @@ -624,7 +623,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac RssConf rssConf = RssSparkConfig.toRssConf(sparkConf); String driver = rssConf.getString("driver.host", ""); int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT); - long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS); + long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS); this.managerClientSupplier = ExpiringCloseableSupplier.of( () -> diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index d41adae29..71d2d5460 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -157,14 +157,6 @@ public class RssBaseConf extends RssConf { .defaultValue(ClientType.GRPC) .withDescription("client type for rss"); - public static final ConfigOption<Long> RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS = - ConfigOptions.key("rss.rpc.client.type.grpc.timeout") - .longType() - .checkValue( - ConfigUtils.POSITIVE_LONG_VALIDATOR, "The grpc timeout must be positive integer") - .defaultValue(60 * 1000L) - .withDescription("Remote shuffle service client type grpc timeout (ms)"); - public static final ConfigOption<StorageType> RSS_STORAGE_TYPE = ConfigOptions.key("rss.storage.type") .enumType(StorageType.class) diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 570ea50c0..1904cf743 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -100,7 +100,8 @@ public class RssClientConf { ConfigOptions.key("rss.client.rpc.timeout.ms") .longType() .defaultValue(60 * 1000L) - .withDescription("Timeout in milliseconds for RPC calls."); + .withDescription( + "The timeout value in milliseconds for gRPC and Netty Type RPC Clients, including ShuffleServerClient and ShuffleManagerClient."); public static final ConfigOption<Integer> RPC_MAX_ATTEMPTS = ConfigOptions.key("rss.client.rpc.maxAttempts") diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index b33aebc1b..ccf4e437e 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -54,7 +54,7 @@ The important configuration of client is listed as following. These configuratio | <client_type>.rss.estimate.server.assignment.enabled | false | Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. [...] | <client_type>.rss.estimate.task.concurrency.per.server | 80 | It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer. [...] | <client_type>.rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. [...] -| <client_type>.rss.client.rpc.timeout.ms | 60000 | Timeout in milliseconds for RPC calls. [...] +| <client_type>.rss.client.rpc.timeout.ms | 60000 | The timeout value in milliseconds for gRPC and Netty Type RPC Clients, including ShuffleServerClient and ShuffleManagerClient. [...] | <client_type>.rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. [...] | <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. [...] | <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. [...] diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java index abe3a9dfa..0d8e884d7 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleServerManagerTestBase.java @@ -23,7 +23,7 @@ import org.junit.jupiter.api.BeforeEach; import org.apache.uniffle.client.factory.ShuffleManagerClientFactory; import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient; import org.apache.uniffle.common.ClientType; -import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.GrpcServer; import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager; @@ -63,7 +63,7 @@ public class ShuffleServerManagerTestBase { shuffleManagerServer = createShuffleManagerServer(); shuffleManagerServer.start(); int port = shuffleManagerServer.getPort(); - long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS); + long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS); client = factory.createShuffleManagerClient(ClientType.GRPC, LOCALHOST, port, rpcTimeout); }