This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 46f3f7e4 [#133] feat(netty): Add StreamServer. (#718)
46f3f7e4 is described below

commit 46f3f7e43ca9d432495823380bb5ceff4edcc427
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Mar 15 20:17:09 2023 +0800

    [#133] feat(netty): Add StreamServer. (#718)
    
    ### What changes were proposed in this pull request?
    
    Add StreamServer for netty replace grpc.
    
    ### Why are the changes needed?
    Add StreamServer.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    UT.
    
    Co-authored-by: leixianming <[email protected]>
---
 server/pom.xml                                     |   5 +
 .../org/apache/uniffle/server/ShuffleServer.java   |  13 ++
 .../apache/uniffle/server/ShuffleServerConf.java   |  55 ++++++++
 .../apache/uniffle/server/netty/StreamServer.java  | 142 +++++++++++++++++++++
 .../netty/decoder/StreamServerInitDecoder.java     |  48 +++++++
 .../apache/uniffle/server/ShuffleServerTest.java   |  22 ++++
 6 files changed, 285 insertions(+)

diff --git a/server/pom.xml b/server/pom.xml
index 582aee83..97907a1e 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -107,6 +107,11 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index a117dba0..3a47a81d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -51,6 +51,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.web.CommonMetricsServlet;
 import org.apache.uniffle.common.web.JettyServer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.netty.StreamServer;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
 import org.apache.uniffle.storage.util.StorageType;
@@ -92,6 +93,8 @@ public class ShuffleServer {
   private volatile boolean running;
   private ExecutorService executorService;
   private Future<?> decommissionFuture;
+  private boolean nettyServerEnabled;
+  private StreamServer streamServer;
 
   public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
     this.shuffleServerConf = shuffleServerConf;
@@ -124,6 +127,9 @@ public class ShuffleServer {
     registerHeartBeat.startHeartBeat();
     jettyServer.start();
     server.start();
+    if (nettyServerEnabled) {
+      streamServer.start();
+    }
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
@@ -164,6 +170,9 @@ public class ShuffleServer {
     }
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
+    if (nettyServerEnabled && streamServer != null) {
+      streamServer.stop();
+    }
     if (executorService != null) {
       executorService.shutdownNow();
     }
@@ -221,6 +230,10 @@ public class ShuffleServer {
     shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, 
shuffleFlushManager);
     shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf, 
shuffleFlushManager,
         shuffleBufferManager, storageManager);
+    nettyServerEnabled = 
shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
+    if (nettyServerEnabled) {
+      streamServer = new StreamServer(this);
+    }
 
     setServer();
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 10b0d2ad..f4d26749 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -404,6 +404,61 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(-1)
       .withDescription("Shuffle netty server port");
 
+  public static final ConfigOption<Boolean> NETTY_SERVER_EPOLL_ENABLE = 
ConfigOptions
+      .key("rss.server.netty.epoll.enable")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("If enable epoll model with netty server");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_ACCEPT_THREAD = 
ConfigOptions
+      .key("rss.server.netty.accept.thread")
+      .intType()
+      .defaultValue(10)
+      .withDescription("Accept thread count in netty");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_WORKER_THREAD = 
ConfigOptions
+      .key("rss.server.netty.worker.thread")
+      .intType()
+      .defaultValue(100)
+      .withDescription("Worker thread count in netty");
+
+  public static final ConfigOption<Long> SERVER_NETTY_HANDLER_IDLE_TIMEOUT = 
ConfigOptions
+      .key("rss.server.netty.handler.idle.timeout")
+      .longType()
+      .defaultValue(60000L)
+      .withDescription("Idle timeout if there has not data");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_BACKLOG = 
ConfigOptions
+      .key("rss.server.netty.connect.backlog")
+      .intType()
+      .defaultValue(0)
+      .withDescription("For netty server, requested maximum length of the 
queue of incoming connections. "
+                           + "Default 0 for no backlog.");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_TIMEOUT = 
ConfigOptions
+      .key("rss.server.netty.connect.timeout")
+      .intType()
+      .defaultValue(5000)
+      .withDescription("Timeout for connection in netty");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_SEND_BUF = 
ConfigOptions
+      .key("rss.server.netty.send.buf")
+      .intType()
+      .defaultValue(0)
+      .withDescription("the optimal size for send buffer(SO_SNDBUF) "
+                           + "should be latency * network_bandwidth. Assuming 
latency = 1ms,"
+                           + "network_bandwidth = 10Gbps, buffer size should 
be ~ 1.25MB."
+                           + "Default is 0, OS will dynamically adjust the buf 
size.");
+
+  public static final ConfigOption<Integer> NETTY_SERVER_RECEIVE_BUF = 
ConfigOptions
+      .key("rss.server.netty.receive.buf")
+      .intType()
+      .defaultValue(0)
+      .withDescription("the optimal size for receive buffer(SO_RCVBUF) "
+                           + "should be latency * network_bandwidth. Assuming 
latency = 1ms,"
+                           + "network_bandwidth = 10Gbps, buffer size should 
be ~ 1.25MB."
+                           + "Default is 0, OS will dynamically adjust the buf 
size.");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
new file mode 100644
index 00000000..8423515b
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.netty.decoder.StreamServerInitDecoder;
+
+public class StreamServer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamServer.class);
+
+  private ShuffleServer shuffleServer;
+  private EventLoopGroup shuffleBossGroup;
+  private EventLoopGroup shuffleWorkerGroup;
+  private ShuffleServerConf shuffleServerConf;
+  private ChannelFuture channelFuture;
+
+  public StreamServer(ShuffleServer shuffleServer) {
+    this.shuffleServer = shuffleServer;
+    this.shuffleServerConf = shuffleServer.getShuffleServerConf();
+    boolean isEpollEnable = 
shuffleServerConf.getBoolean(ShuffleServerConf.NETTY_SERVER_EPOLL_ENABLE);
+    int acceptThreads = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_ACCEPT_THREAD);
+    int workerThreads = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_WORKER_THREAD);
+    if (isEpollEnable) {
+      shuffleBossGroup = new EpollEventLoopGroup(acceptThreads);
+      shuffleWorkerGroup = new EpollEventLoopGroup(workerThreads);
+    } else {
+      shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
+      shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
+    }
+  }
+
+  private ServerBootstrap bootstrapChannel(
+      EventLoopGroup bossGroup,
+      EventLoopGroup workerGroup,
+      int backlogSize,
+      int timeoutMillis,
+      int sendBuf,
+      int receiveBuf,
+      Supplier<ChannelHandler[]> handlerSupplier) {
+    ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, 
workerGroup);
+    if (bossGroup instanceof EpollEventLoopGroup) {
+      serverBootstrap.channel(EpollServerSocketChannel.class);
+    } else {
+      serverBootstrap.channel(NioServerSocketChannel.class);
+    }
+
+    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(final SocketChannel ch) {
+        ch.pipeline().addLast(handlerSupplier.get());
+      }
+    })
+               .option(ChannelOption.SO_BACKLOG, backlogSize)
+               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
+               .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+               .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
timeoutMillis)
+               .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
+               .childOption(ChannelOption.TCP_NODELAY, true)
+               .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+    if (sendBuf > 0) {
+      serverBootstrap.childOption(ChannelOption.SO_SNDBUF, sendBuf);
+    }
+    if (receiveBuf > 0) {
+      serverBootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBuf);
+    }
+    return serverBootstrap;
+  }
+
+  public void start() {
+    Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
+        new StreamServerInitDecoder()
+    };
+    ServerBootstrap serverBootstrap = bootstrapChannel(shuffleBossGroup, 
shuffleWorkerGroup,
+        
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_BACKLOG),
+        
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_TIMEOUT),
+        shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_SEND_BUF),
+        
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_RECEIVE_BUF),
+        streamHandlers);
+
+    // Bind the ports and save the results so that the channels can be closed 
later.
+    // If the second bind fails, the first one gets cleaned up in the shutdown.
+    int port = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
+    try {
+      channelFuture =  serverBootstrap.bind(port);
+      channelFuture.syncUninterruptibly();
+      LOG.info("bind localAddress is " + 
channelFuture.channel().localAddress());
+      LOG.info("Start stream server successfully with port " + port);
+    } catch (Exception e) {
+      ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+    }
+  }
+
+  public void stop() {
+    if (channelFuture != null) {
+      channelFuture.channel().close().awaitUninterruptibly(10L, 
TimeUnit.SECONDS);
+      channelFuture = null;
+    }
+    if (shuffleBossGroup != null) {
+      shuffleBossGroup.shutdownGracefully();
+      shuffleWorkerGroup.shutdownGracefully();
+      shuffleBossGroup = null;
+      shuffleWorkerGroup = null;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
new file mode 100644
index 00000000..d2551e55
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty.decoder;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+public class StreamServerInitDecoder extends ByteToMessageDecoder {
+
+  public StreamServerInitDecoder() {
+  }
+
+  private void addDecoder(ChannelHandlerContext ctx, byte type) {
+
+  }
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx,
+      ByteBuf in,
+      List<Object> out) {
+    if (in.readableBytes() < Byte.BYTES) {
+      return;
+    }
+    in.markReaderIndex();
+    byte magicByte = in.readByte();
+    in.resetReaderIndex();
+
+    addDecoder(ctx, magicByte);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 6dc5e56e..a2cc0895 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -134,4 +134,26 @@ public class ShuffleServerTest {
     serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
     return serverConf;
   }
+
+  @Test
+  public void nettyServerTest() throws Exception {
+    ShuffleServerConf serverConf = createShuffleServerConf();
+    serverConf.set(ShuffleServerConf.NETTY_SERVER_PORT, 29999);
+    ShuffleServer ss1 = new ShuffleServer(serverConf);
+    ss1.start();
+    ExitUtils.disableSystemExit();
+    serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19997);
+    serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19996);
+    ShuffleServer ss2 = new ShuffleServer(serverConf);
+    String expectMessage = "Fail to start stream server";
+    final int expectStatus = 1;
+    try {
+      ss2.start();
+    } catch (Exception e) {
+      assertEquals(expectMessage, e.getMessage());
+      assertEquals(expectStatus, ((ExitException) e).getStatus());
+      return;
+    }
+    fail();
+  }
 }

Reply via email to