This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 89f07fb5 [#782]refactor: restrict rss.rpc.server.type to an enum (#783)
89f07fb5 is described below
commit 89f07fb587fd9e0f8bf593e562df20b35ad9c361
Author: advancedxy <[email protected]>
AuthorDate: Tue Apr 4 14:09:50 2023 +0800
[#782]refactor: restrict rss.rpc.server.type to an enum (#783)
### What changes were proposed in this pull request?
1. make RPC_SERVER_TYPE an enum
2. remove ServerType definition in various places
### Why are the changes needed?
This resolves #782
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../manager/ShuffleManagerServerFactory.java | 9 +++-----
.../manager/ShuffleManagerServerFactoryTest.java | 5 +++--
.../apache/uniffle/common/config/RssBaseConf.java | 7 +++---
.../org/apache/uniffle/common/rpc/ServerType.java | 26 ++++++++++++++++++++++
.../uniffle/coordinator/CoordinatorFactory.java | 12 +++++-----
.../uniffle/coordinator/CoordinatorConfTest.java | 2 +-
coordinator/src/test/resources/coordinator.conf | 2 +-
.../uniffle/server/ShuffleServerFactory.java | 10 ++++-----
.../uniffle/server/MockedShuffleServerFactory.java | 5 +++--
.../uniffle/server/ShuffleServerConfTest.java | 4 ++--
10 files changed, 54 insertions(+), 28 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
index 5c395e52..5d0174e6 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -21,6 +21,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.rpc.GrpcServer;
+import org.apache.uniffle.common.rpc.ServerType;
public class ShuffleManagerServerFactory {
private final RssShuffleManagerInterface shuffleManager;
@@ -33,8 +34,8 @@ public class ShuffleManagerServerFactory {
}
public GrpcServer getServer() {
- String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
- if (ServerType.GRPC.name().equals(type)) {
+ ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
+ if (type == ServerType.GRPC) {
return GrpcServer.Builder.newBuilder()
.conf(conf)
.grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
@@ -49,8 +50,4 @@ public class ShuffleManagerServerFactory {
public RssBaseConf getConf() {
return conf;
}
-
- enum ServerType {
- GRPC
- }
}
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
index 5e287356..e17d4ef0 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.shuffle.manager;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.rpc.ServerType;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -28,13 +29,13 @@ public class ShuffleManagerServerFactoryTest {
public void testShuffleManagerServerType() {
// add code to generate tests that check the server type
RssBaseConf conf = new RssBaseConf();
- conf.set(RssBaseConf.RPC_SERVER_TYPE, "GRPC");
+ conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
ShuffleManagerServerFactory factory = new
ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();
// other types should raise an exception
- conf.set(RssBaseConf.RPC_SERVER_TYPE, "Netty");
+ conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
factory = new ShuffleManagerServerFactory(null, conf);
assertThrows(UnsupportedOperationException.class, factory::getServer);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 09322a62..8dd74d34 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.rpc.ServerType;
public class RssBaseConf extends RssConf {
@@ -30,10 +31,10 @@ public class RssBaseConf extends RssConf {
.noDefaultValue()
.withDescription("Coordinator quorum");
- public static final ConfigOption<String> RPC_SERVER_TYPE = ConfigOptions
+ public static final ConfigOption<ServerType> RPC_SERVER_TYPE = ConfigOptions
.key("rss.rpc.server.type")
- .stringType()
- .defaultValue("GRPC")
+ .enumType(ServerType.class)
+ .defaultValue(ServerType.GRPC)
.withDescription("Shuffle server type, default is grpc");
public static final ConfigOption<Integer> RPC_SERVER_PORT = ConfigOptions
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java
b/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java
new file mode 100644
index 00000000..4d72f79a
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.rpc;
+
+/**
+ * This should sync/match with how ClientType changes
+ */
+public enum ServerType {
+ GRPC,
+ GRPC_NETTY,
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
index 1a952d27..a419a43b 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
public class CoordinatorFactory {
@@ -31,8 +32,11 @@ public class CoordinatorFactory {
}
public ServerInterface getServer() {
- String type = conf.getString(CoordinatorConf.RPC_SERVER_TYPE);
- if (type.equals(ServerType.GRPC.name())) {
+ ServerType type = conf.get(CoordinatorConf.RPC_SERVER_TYPE);
+ // Coordinator currently only has grpc support. However, we should support
create grpc server
+ // even if the server type is GRPC_NETTY. Otherwise, we cannot use a
unified configuration
+ // to start both coordinator and shuffle server
+ if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
return GrpcServer.Builder.newBuilder()
.conf(conf)
.grpcMetrics(coordinatorServer.getGrpcMetrics())
@@ -41,8 +45,4 @@ public class CoordinatorFactory {
throw new UnsupportedOperationException("Unsupported server type " +
type);
}
}
-
- enum ServerType {
- GRPC
- }
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index 3d06f79f..8f93f365 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -33,7 +33,7 @@ public class CoordinatorConfTest {
// test base conf
assertEquals(9527, conf.getInteger(CoordinatorConf.RPC_SERVER_PORT));
- assertEquals("testRpc", conf.getString(CoordinatorConf.RPC_SERVER_TYPE));
+ assertEquals("GRPC", conf.get(CoordinatorConf.RPC_SERVER_TYPE).name());
assertEquals(9526, conf.getInteger(CoordinatorConf.JETTY_HTTP_PORT));
// test coordinator specific conf
diff --git a/coordinator/src/test/resources/coordinator.conf
b/coordinator/src/test/resources/coordinator.conf
index a29bfdfb..4c881e64 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -17,7 +17,7 @@
# base conf
rss.rpc.server.port 9527
-rss.rpc.server.type testRpc
+rss.rpc.server.type GRPC
rss.jetty.http.port 9526
# coordinator specific conf
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
index 8ba4adca..1464292f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
public class ShuffleServerFactory {
@@ -31,8 +32,10 @@ public class ShuffleServerFactory {
}
public ServerInterface getServer() {
- String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
- if (type.equals(ServerType.GRPC.name())) {
+ ServerType type = conf.get(ShuffleServerConf.RPC_SERVER_TYPE);
+ // supports both grpc and grpc_netty, so coordinator and shuffle server
could have unified
+ // configuration
+ if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
return GrpcServer.Builder.newBuilder()
.conf(conf)
.grpcMetrics(shuffleServer.getGrpcMetrics())
@@ -53,7 +56,4 @@ public class ShuffleServerFactory {
return conf;
}
- enum ServerType {
- GRPC
- }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
index a43a7504..1945150d 100644
---
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
+++
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
public class MockedShuffleServerFactory extends ShuffleServerFactory {
private static final Logger LOG =
LoggerFactory.getLogger(MockedShuffleServerFactory.class);
@@ -33,8 +34,8 @@ public class MockedShuffleServerFactory extends
ShuffleServerFactory {
public ServerInterface getServer() {
ShuffleServerConf conf = getConf();
ShuffleServer shuffleServer = getShuffleServer();
- String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
- if (type.equals(ServerType.GRPC.name())) {
+ ServerType type = conf.get(ShuffleServerConf.RPC_SERVER_TYPE);
+ if (type == ServerType.GRPC) {
return new MockedGrpcServer(conf, new
MockedShuffleServerGrpcService(shuffleServer),
shuffleServer.getGrpcMetrics());
} else {
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index f70ca342..ca707c79 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -35,7 +35,7 @@ public class ShuffleServerConfTest {
public void defaultConfTest() {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
assertFalse(shuffleServerConf.loadConfFromFile(null));
- assertEquals("GRPC",
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+ assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
assertEquals(256,
shuffleServerConf.getInteger(ShuffleServerConf.JETTY_CORE_POOL_SIZE));
assertEquals(0,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD));
}
@@ -62,7 +62,7 @@ public class ShuffleServerConfTest {
assertEquals(2,
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1",
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
- assertEquals("GRPC",
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+ assertEquals("GRPC",
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
}
@Test