This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b52449a [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829) b52449a is described below commit b52449a6604cbdc4fe70bdfd8ecc9456d6b42ece Author: gbensa <58692163+gbe...@users.noreply.github.com> AuthorDate: Sat Dec 14 03:40:39 2019 +0100 [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829) ### Motivation UDP protocol is not working for netty connector ### Modifications Added a specific handler for UDP and use Channel instead of SocketChannel in NettyChannelInitializer Successfully tested * Fixes for UDP protocol support in netty connector * Added specific handlers for UDP and TCP netty connector --- .../apache/pulsar/io/netty/server/NettyServer.java | 21 +++++++++------ .../NettyTCPChannelInitializer.java} | 14 +++++----- .../NettyTCPServerHandler.java} | 22 ++++++++-------- .../package-info.java} | 26 +------------------ .../NettyUDPChannelInitializer.java} | 14 +++++----- .../NettyUDPServerHandler.java} | 30 ++++++++++++---------- .../package-info.java} | 26 +------------------ ...tyChannelInitializer.java => package-info.java} | 25 ------------------ .../NettyTCPChannelInitializerTest.java} | 21 +++++++++------ .../NettyUDPChannelInitializerTest.java} | 21 +++++++-------- 10 files changed, 79 insertions(+), 141 deletions(-) diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java index 775b6f4..60c9c2d 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java @@ -21,6 +21,17 @@ package org.apache.pulsar.io.netty.server; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.netty.NettySource; +import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer; +import org.apache.pulsar.io.netty.http.NettyHttpServerHandler; +import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer; +import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler; +import org.apache.pulsar.io.netty.udp.NettyUDPChannelInitializer; +import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -30,12 +41,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.io.netty.NettySource; -import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer; -import org.apache.pulsar.io.netty.http.NettyHttpServerHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Netty Server to accept incoming data via the configured type. @@ -96,7 +101,7 @@ public class NettyServer { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(NioDatagramChannel.class); - bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource))) + bootstrap.handler(new NettyUDPChannelInitializer(new NettyUDPServerHandler(this.nettySource))) .option(ChannelOption.SO_BACKLOG, 1024); ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync(); @@ -105,7 +110,7 @@ public class NettyServer { private void runTcp() throws InterruptedException { ServerBootstrap serverBootstrap = getServerBootstrap( - new NettyChannelInitializer(new NettyServerHandler(this.nettySource))); + new NettyTCPChannelInitializer(new NettyTCPServerHandler(this.nettySource))); ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync(); channelFuture.channel().closeFuture().sync(); diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java similarity index 72% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java index b9a7b4c..1f4972b 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java @@ -16,28 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.tcp; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; +import io.netty.channel.Channel; import io.netty.handler.codec.bytes.ByteArrayDecoder; /** * Netty Channel Initializer to register decoder and handler. */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { +public class NettyTCPChannelInitializer extends ChannelInitializer<Channel> { private ChannelInboundHandlerAdapter handler; - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { + public NettyTCPChannelInitializer(ChannelInboundHandlerAdapter handler) { this.handler = handler; } @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); + protected void initChannel(Channel channel) throws Exception { + channel.pipeline().addLast(new ByteArrayDecoder()); + channel.pipeline().addLast(this.handler); } } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java similarity index 82% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java index 42f497e..a619bc3 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java @@ -16,38 +16,38 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +package org.apache.pulsar.io.netty.tcp; import java.io.Serializable; import java.util.Optional; -import lombok.Data; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.netty.NettySource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.Data; + /** * Handles a server-side channel. */ @ChannelHandler.Sharable -public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> { +public class NettyTCPServerHandler extends SimpleChannelInboundHandler<byte[]> { - private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); + private static final Logger logger = LoggerFactory.getLogger(NettyTCPServerHandler.class); private NettySource nettySource; - public NettyServerHandler(NettySource nettySource) { + public NettyTCPServerHandler(NettySource nettySource) { this.nettySource = nettySource; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception { - nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes)); + nettySource.consume(new NettyTCPRecord(Optional.ofNullable(""), bytes)); } @Override @@ -57,7 +57,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> { } @Data - static private class NettyRecord implements Record<byte[]>, Serializable { + static private class NettyTCPRecord implements Record<byte[]>, Serializable { private final Optional<String> key; private final byte[] value; } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java index b9a7b4c..b59d614 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java @@ -16,28 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler. - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} +package org.apache.pulsar.io.netty.tcp; \ No newline at end of file diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java similarity index 68% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java index b9a7b4c..bc86559 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java @@ -16,28 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.udp; +import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; /** * Netty Channel Initializer to register decoder and handler. */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { +public class NettyUDPChannelInitializer extends ChannelInitializer<Channel> { private ChannelInboundHandlerAdapter handler; - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { + public NettyUDPChannelInitializer(ChannelInboundHandlerAdapter handler) { this.handler = handler; } @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); + protected void initChannel(Channel channel) throws Exception { + channel.pipeline().addLast(this.handler); } } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java similarity index 75% rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java index 42f497e..8341628 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java @@ -16,38 +16,40 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +package org.apache.pulsar.io.netty.udp; import java.io.Serializable; import java.util.Optional; -import lombok.Data; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.netty.NettySource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; +import lombok.Data; + /** * Handles a server-side channel. */ @ChannelHandler.Sharable -public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> { - - private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); +public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { + private static final Logger logger = LoggerFactory.getLogger(NettyUDPServerHandler.class); private NettySource nettySource; - public NettyServerHandler(NettySource nettySource) { + public NettyUDPServerHandler(NettySource nettySource) { this.nettySource = nettySource; } - + @Override - protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception { - nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes)); + protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception { + byte[] bytes = ByteBufUtil.getBytes(packet.content()); + nettySource.consume(new NettyUDPRecord(Optional.ofNullable(""), bytes)); } @Override @@ -57,7 +59,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> { } @Data - static private class NettyRecord implements Record<byte[]>, Serializable { + static private class NettyUDPRecord implements Record<byte[]>, Serializable { private final Optional<String> key; private final byte[] value; } diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java similarity index 50% copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java index b9a7b4c..d936c6a 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java @@ -16,28 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler. - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} +package org.apache.pulsar.io.netty.udp; \ No newline at end of file diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/package-info.java similarity index 50% rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java rename to pulsar-io/netty/src/main/java/package-info.java index b9a7b4c..51da6c0 100644 --- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java +++ b/pulsar-io/netty/src/main/java/package-info.java @@ -16,28 +16,3 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.bytes.ByteArrayDecoder; - -/** - * Netty Channel Initializer to register decoder and handler. - */ -public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { - - private ChannelInboundHandlerAdapter handler; - - public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) { - this.handler = handler; - } - - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new ByteArrayDecoder()); - socketChannel.pipeline().addLast(this.handler); - } - -} diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java similarity index 73% copy from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java index 7fb3b45..366c7b1 100644 --- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java @@ -16,30 +16,35 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.tcp; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.pulsar.io.netty.NettySource; +import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer; +import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler; +import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioSocketChannel; /** * Tests for Netty Channel Initializer */ -public class NettyChannelInitializerTest { +public class NettyTCPChannelInitializerTest { @Test public void testChannelInitializer() throws Exception { NioSocketChannel channel = new NioSocketChannel(); - NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer( - new NettyServerHandler(new NettySource())); + NettyTCPChannelInitializer nettyChannelInitializer = new NettyTCPChannelInitializer( + new NettyTCPServerHandler(new NettySource())); nettyChannelInitializer.initChannel(channel); assertNotNull(channel.pipeline().toMap()); assertEquals(2, channel.pipeline().toMap().size()); } - + } diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java similarity index 74% rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java index 7fb3b45..33d4940 100644 --- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java +++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java @@ -16,30 +16,31 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.io.netty.server; +package org.apache.pulsar.io.netty.udp; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.pulsar.io.netty.NettySource; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; +import io.netty.channel.socket.nio.NioDatagramChannel; /** * Tests for Netty Channel Initializer */ -public class NettyChannelInitializerTest { +public class NettyUDPChannelInitializerTest { @Test public void testChannelInitializer() throws Exception { - NioSocketChannel channel = new NioSocketChannel(); + NioDatagramChannel channel = new NioDatagramChannel(); - NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer( - new NettyServerHandler(new NettySource())); + NettyUDPChannelInitializer nettyChannelInitializer = new NettyUDPChannelInitializer( + new NettyUDPServerHandler(new NettySource())); nettyChannelInitializer.initChannel(channel); assertNotNull(channel.pipeline().toMap()); - assertEquals(2, channel.pipeline().toMap().size()); + assertEquals(1, channel.pipeline().toMap().size()); } - + }