This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 89f07fb5 [#782]refactor: restrict rss.rpc.server.type to an enum (#783)
89f07fb5 is described below

commit 89f07fb587fd9e0f8bf593e562df20b35ad9c361
Author: advancedxy <[email protected]>
AuthorDate: Tue Apr 4 14:09:50 2023 +0800

    [#782]refactor: restrict rss.rpc.server.type to an enum (#783)
    
    ### What changes were proposed in this pull request?
    1. make RPC_SERVER_TYPE an enum
    2. remove ServerType definition in various places
    
    ### Why are the changes needed?
    This resolves #782
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs
---
 .../manager/ShuffleManagerServerFactory.java       |  9 +++-----
 .../manager/ShuffleManagerServerFactoryTest.java   |  5 +++--
 .../apache/uniffle/common/config/RssBaseConf.java  |  7 +++---
 .../org/apache/uniffle/common/rpc/ServerType.java  | 26 ++++++++++++++++++++++
 .../uniffle/coordinator/CoordinatorFactory.java    | 12 +++++-----
 .../uniffle/coordinator/CoordinatorConfTest.java   |  2 +-
 coordinator/src/test/resources/coordinator.conf    |  2 +-
 .../uniffle/server/ShuffleServerFactory.java       | 10 ++++-----
 .../uniffle/server/MockedShuffleServerFactory.java |  5 +++--
 .../uniffle/server/ShuffleServerConfTest.java      |  4 ++--
 10 files changed, 54 insertions(+), 28 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
index 5c395e52..5d0174e6 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -21,6 +21,7 @@ import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.rpc.GrpcServer;
+import org.apache.uniffle.common.rpc.ServerType;
 
 public class ShuffleManagerServerFactory {
   private final RssShuffleManagerInterface shuffleManager;
@@ -33,8 +34,8 @@ public class ShuffleManagerServerFactory {
   }
 
   public GrpcServer getServer() {
-    String type = conf.getString(RssBaseConf.RPC_SERVER_TYPE);
-    if (ServerType.GRPC.name().equals(type)) {
+    ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
+    if (type == ServerType.GRPC) {
       return GrpcServer.Builder.newBuilder()
           .conf(conf)
           .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
@@ -49,8 +50,4 @@ public class ShuffleManagerServerFactory {
   public RssBaseConf getConf() {
     return conf;
   }
-
-  enum ServerType {
-    GRPC
-  }
 }
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
index 5e287356..e17d4ef0 100644
--- 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.shuffle.manager;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.rpc.ServerType;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -28,13 +29,13 @@ public class ShuffleManagerServerFactoryTest {
   public void testShuffleManagerServerType() {
     // add code to generate tests that check the server type
     RssBaseConf conf = new RssBaseConf();
-    conf.set(RssBaseConf.RPC_SERVER_TYPE, "GRPC");
+    conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
     ShuffleManagerServerFactory factory = new 
ShuffleManagerServerFactory(null, conf);
     // this should execute normally;
     factory.getServer();
 
     // other types should raise an exception
-    conf.set(RssBaseConf.RPC_SERVER_TYPE, "Netty");
+    conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
     factory = new ShuffleManagerServerFactory(null, conf);
     assertThrows(UnsupportedOperationException.class, factory::getServer);
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 09322a62..8dd74d34 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.rpc.ServerType;
 
 public class RssBaseConf extends RssConf {
 
@@ -30,10 +31,10 @@ public class RssBaseConf extends RssConf {
       .noDefaultValue()
       .withDescription("Coordinator quorum");
 
-  public static final ConfigOption<String> RPC_SERVER_TYPE = ConfigOptions
+  public static final ConfigOption<ServerType> RPC_SERVER_TYPE = ConfigOptions
       .key("rss.rpc.server.type")
-      .stringType()
-      .defaultValue("GRPC")
+      .enumType(ServerType.class)
+      .defaultValue(ServerType.GRPC)
       .withDescription("Shuffle server type, default is grpc");
 
   public static final ConfigOption<Integer> RPC_SERVER_PORT = ConfigOptions
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java
new file mode 100644
index 00000000..4d72f79a
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/ServerType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.rpc;
+
+/**
+ * This should sync/match with how ClientType changes
+ */
+public enum ServerType {
+  GRPC,
+  GRPC_NETTY,
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
index 1a952d27..a419a43b 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator;
 
 import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
 
 public class CoordinatorFactory {
 
@@ -31,8 +32,11 @@ public class CoordinatorFactory {
   }
 
   public ServerInterface getServer() {
-    String type = conf.getString(CoordinatorConf.RPC_SERVER_TYPE);
-    if (type.equals(ServerType.GRPC.name())) {
+    ServerType type = conf.get(CoordinatorConf.RPC_SERVER_TYPE);
+    // Coordinator currently only has grpc support. However, we should support 
create grpc server
+    // even if the server type is GRPC_NETTY. Otherwise, we cannot use a 
unified configuration
+    // to start both coordinator and shuffle server
+    if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
       return GrpcServer.Builder.newBuilder()
           .conf(conf)
           .grpcMetrics(coordinatorServer.getGrpcMetrics())
@@ -41,8 +45,4 @@ public class CoordinatorFactory {
       throw new UnsupportedOperationException("Unsupported server type " + 
type);
     }
   }
-
-  enum ServerType {
-    GRPC
-  }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
index 3d06f79f..8f93f365 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java
@@ -33,7 +33,7 @@ public class CoordinatorConfTest {
 
     // test base conf
     assertEquals(9527, conf.getInteger(CoordinatorConf.RPC_SERVER_PORT));
-    assertEquals("testRpc", conf.getString(CoordinatorConf.RPC_SERVER_TYPE));
+    assertEquals("GRPC", conf.get(CoordinatorConf.RPC_SERVER_TYPE).name());
     assertEquals(9526, conf.getInteger(CoordinatorConf.JETTY_HTTP_PORT));
 
     // test coordinator specific conf
diff --git a/coordinator/src/test/resources/coordinator.conf 
b/coordinator/src/test/resources/coordinator.conf
index a29bfdfb..4c881e64 100644
--- a/coordinator/src/test/resources/coordinator.conf
+++ b/coordinator/src/test/resources/coordinator.conf
@@ -17,7 +17,7 @@
 
 # base conf
 rss.rpc.server.port 9527
-rss.rpc.server.type testRpc
+rss.rpc.server.type GRPC
 rss.jetty.http.port 9526
 
 # coordinator specific conf
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
index 8ba4adca..1464292f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
 
 import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
 
 public class ShuffleServerFactory {
 
@@ -31,8 +32,10 @@ public class ShuffleServerFactory {
   }
 
   public ServerInterface getServer() {
-    String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
-    if (type.equals(ServerType.GRPC.name())) {
+    ServerType type = conf.get(ShuffleServerConf.RPC_SERVER_TYPE);
+    // supports both grpc and grpc_netty, so coordinator and shuffle server 
could have unified
+    // configuration
+    if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
       return GrpcServer.Builder.newBuilder()
           .conf(conf)
           .grpcMetrics(shuffleServer.getGrpcMetrics())
@@ -53,7 +56,4 @@ public class ShuffleServerFactory {
     return conf;
   }
 
-  enum ServerType {
-    GRPC
-  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
index a43a7504..1945150d 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerFactory.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.rpc.ServerType;
 
 public class MockedShuffleServerFactory extends ShuffleServerFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(MockedShuffleServerFactory.class);
@@ -33,8 +34,8 @@ public class MockedShuffleServerFactory extends 
ShuffleServerFactory {
   public ServerInterface getServer() {
     ShuffleServerConf conf = getConf();
     ShuffleServer shuffleServer = getShuffleServer();
-    String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
-    if (type.equals(ServerType.GRPC.name())) {
+    ServerType type = conf.get(ShuffleServerConf.RPC_SERVER_TYPE);
+    if (type == ServerType.GRPC) {
       return new MockedGrpcServer(conf, new 
MockedShuffleServerGrpcService(shuffleServer),
         shuffleServer.getGrpcMetrics());
     } else {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index f70ca342..ca707c79 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -35,7 +35,7 @@ public class ShuffleServerConfTest {
   public void defaultConfTest() {
     ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
     assertFalse(shuffleServerConf.loadConfFromFile(null));
-    assertEquals("GRPC", 
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+    assertEquals("GRPC", 
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
     assertEquals(256, 
shuffleServerConf.getInteger(ShuffleServerConf.JETTY_CORE_POOL_SIZE));
     assertEquals(0, 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD));
   }
@@ -62,7 +62,7 @@ public class ShuffleServerConfTest {
     assertEquals(2, 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
     assertEquals("value1", 
shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
     assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
-    assertEquals("GRPC", 
shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
+    assertEquals("GRPC", 
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE).name());
   }
 
   @Test

Reply via email to