srkukarni closed pull request #3336: Added HTTP Support
URL: https://github.com/apache/pulsar/pull/3336
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
index 215bd344a6..1e799d8f58 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
@@ -18,20 +18,21 @@
  */
 package org.apache.pulsar.io.netty;
 
+import java.util.Map;
+
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.netty.server.NettyServer;
-import java.util.Map;
 
 /**
- * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and 
write to user-defined Pulsar topic
+ * A simple Netty Source connector to listen for incoming messages and write 
to user-defined Pulsar topic.
  */
 @Connector(
     name = "netty",
     type = IOType.SOURCE,
-    help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp 
messages and write to user-defined Pulsar topic",
+    help = "A simple Netty Source connector to listen for incoming messages 
and write to user-defined Pulsar topic",
     configClass = NettySourceConfig.class)
 public class NettySource extends PushSource<byte[]> {
 
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
index f5d40e930a..1ef4c35a27 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
@@ -20,17 +20,22 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import lombok.*;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
 /**
- * Netty Tcp or Udp Source Connector Config.
+ * Netty Source Connector Config.
  */
 @Data
 @Setter
@@ -45,7 +50,7 @@
     @FieldDoc(
             required = true,
             defaultValue = "tcp",
-            help = "The tcp or udp network protocols")
+            help = "The network protocol to use, supported values are 'tcp', 
'udp', and 'http'")
     private String type = "tcp";
 
     @FieldDoc(
@@ -63,8 +68,8 @@
     @FieldDoc(
             required = true,
             defaultValue = "1",
-            help = "The number of threads of Netty Tcp Server to accept 
incoming connections and " +
-                    "handle the traffic of the accepted connections")
+            help = "The number of threads of Netty Tcp Server to accept 
incoming connections and "
+                    + "handle the traffic of the accepted connections")
     private int numberOfThreads = 1;
 
     public static NettySourceConfig load(Map<String, Object> map) throws 
IOException {
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
new file mode 100644
index 0000000000..dce26fbe6c
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.http;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
+
+/**
+ * Netty Channel Initializer to register HTTP decoder and handler.
+ */
+public class NettyHttpChannelInitializer extends 
ChannelInitializer<SocketChannel> {
+
+    private final SslContext sslCtx;
+    private ChannelInboundHandlerAdapter handler;
+
+    public NettyHttpChannelInitializer(ChannelInboundHandlerAdapter handler, 
SslContext sslCtx) {
+        this.handler = handler;
+        this.sslCtx = sslCtx;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        if (sslCtx != null) {
+            
socketChannel.pipeline().addLast(sslCtx.newHandler(socketChannel.alloc()));
+        }
+        socketChannel.pipeline().addLast(new HttpServerCodec());
+        socketChannel.pipeline().addLast(this.handler);
+    }
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
new file mode 100644
index 0000000000..ef4cf8c28e
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.http;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import lombok.Data;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.netty.NettySource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles a server-side HTTP channel.
+ */
+@ChannelHandler.Sharable
+public class NettyHttpServerHandler extends 
SimpleChannelInboundHandler<Object> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyHttpServerHandler.class);
+
+    private NettySource nettySource;
+
+    public NettyHttpServerHandler(NettySource nettySource) {
+        this.nettySource = nettySource;
+    }
+
+    private HttpRequest request;
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+
+        if (msg instanceof HttpRequest) {
+            HttpRequest request = this.request = (HttpRequest) msg;
+
+            if (HttpUtil.is100ContinueExpected(request)) {
+                send100Continue(ctx);
+            }
+        }
+
+        if (msg instanceof HttpContent) {
+            HttpContent httpContent = (HttpContent) msg;
+
+            ByteBuf content = httpContent.content();
+            if (content.isReadable()) {
+                nettySource.consume(new 
NettyHttpRecord(Optional.ofNullable(""),
+                        content.toString(CharsetUtil.UTF_8).getBytes()));
+            }
+
+            if (msg instanceof LastHttpContent) {
+                LastHttpContent trailer = (LastHttpContent) msg;
+
+                if (!writeResponse(trailer, ctx)) {
+                    // If keep-alive is off, close the connection once the 
content is fully written.
+                    
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                }
+            }
+        }
+    }
+
+    private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext 
ctx) {
+        // Decide whether to close the connection or not.
+        boolean keepAlive = HttpUtil.isKeepAlive(request);
+        // Build the response object.
+        FullHttpResponse response = new DefaultFullHttpResponse(
+                HttpVersion.HTTP_1_1,
+                currentObj.decoderResult().isSuccess() ? HttpResponseStatus.OK 
: HttpResponseStatus.BAD_REQUEST,
+                Unpooled.EMPTY_BUFFER);
+
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
+
+        if (keepAlive) {
+            // Add 'Content-Length' header only for a keep-alive connection.
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
+            // Add keep alive header as per:
+            // - 
http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
+            response.headers().set(HttpHeaderNames.CONNECTION, 
HttpHeaderValues.KEEP_ALIVE);
+        }
+
+        // Write the response.
+        ctx.write(response);
+
+        return keepAlive;
+    }
+
+    private static void send100Continue(ChannelHandlerContext ctx) {
+        FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
+        ctx.write(response);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        logger.error("Error when processing incoming data", cause);
+        ctx.close();
+    }
+
+    @Data
+    static private class NettyHttpRecord implements Record<byte[]>, 
Serializable {
+        private final Optional<String> key;
+        private final byte[] value;
+    }
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
new file mode 100644
index 0000000000..2e71305b28
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.http;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java
new file mode 100644
index 0000000000..482e05ac88
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
index b5fa8209ff..b9a7b4c825 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
@@ -24,7 +24,7 @@
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
- * Netty Channel Initializer to register decoder and handler
+ * Netty Channel Initializer to register decoder and handler.
  */
 public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
 
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 2cb803153a..775b6f4d42 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.io.netty.server;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -29,11 +32,13 @@
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Netty Tcp or Udp Server to accept any incoming data through Tcp.
+ * Netty Server to accept incoming data via the configured type.
  */
 public class NettyServer {
 
@@ -58,28 +63,32 @@ private NettyServer(Builder builder) {
     public void run() {
         try {
             switch (type) {
-                case TCP:
-                    runTcp();
-                    break;
                 case UDP:
                     runUdp();
                     break;
+                case HTTP:
+                    runHttp();
+                    break;
+                case TCP:
                 default:
                     runTcp();
                     break;
             }
-        } catch(Exception ex) {
-            logger.error("Error occurred when Netty Tcp or Udp Server is 
running", ex);
+        } catch (Exception ex) {
+            logger.error("Error occurred when Netty Server is running", ex);
         } finally {
             shutdownGracefully();
         }
     }
 
     public void shutdownGracefully() {
-        if (workerGroup != null)
+        if (workerGroup != null) {
             workerGroup.shutdownGracefully();
-        if (bossGroup != null)
+        }
+
+        if (bossGroup != null) {
             bossGroup.shutdownGracefully();
+        }
     }
 
     private void runUdp() throws InterruptedException {
@@ -95,17 +104,32 @@ private void runUdp() throws InterruptedException {
     }
 
     private void runTcp() throws InterruptedException {
+        ServerBootstrap serverBootstrap = getServerBootstrap(
+                new NettyChannelInitializer(new 
NettyServerHandler(this.nettySource)));
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private void runHttp() throws InterruptedException {
+        ServerBootstrap serverBootstrap = getServerBootstrap(
+                new NettyHttpChannelInitializer(new 
NettyHttpServerHandler(this.nettySource), null));
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private ServerBootstrap getServerBootstrap(ChannelHandler childHandler) {
         bossGroup = new NioEventLoopGroup(this.numberOfThreads);
         workerGroup = new NioEventLoopGroup(this.numberOfThreads);
         ServerBootstrap serverBootstrap = new ServerBootstrap();
         serverBootstrap.group(bossGroup, workerGroup);
         serverBootstrap.channel(NioServerSocketChannel.class);
-        serverBootstrap.childHandler(new NettyChannelInitializer(new 
NettyServerHandler(this.nettySource)))
-                .option(ChannelOption.SO_BACKLOG, 1024)
-                .childOption(ChannelOption.SO_KEEPALIVE, true);
+        serverBootstrap.childHandler(childHandler)
+        .option(ChannelOption.SO_BACKLOG, 1024)
+        .childOption(ChannelOption.SO_KEEPALIVE, true);
 
-        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
-        channelFuture.channel().closeFuture().sync();
+        return serverBootstrap;
     }
 
     /**
@@ -145,11 +169,11 @@ public Builder setNumberOfThreads(int numberOfThreads) {
         }
 
         public NettyServer build() {
-            Preconditions.checkNotNull(this.type, "type cannot be blank/null");
-            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host 
cannot be blank/null");
-            Preconditions.checkArgument(this.port >= 1024, "port must be set 
equal or bigger than 1024");
-            Preconditions.checkNotNull(this.nettySource, "nettySource must be 
set");
-            Preconditions.checkArgument(this.numberOfThreads > 0,
+            checkNotNull(this.type, "type cannot be blank/null");
+            checkArgument(StringUtils.isNotBlank(host), "host cannot be 
blank/null");
+            checkArgument(this.port >= 1024, "port must be set equal or bigger 
than 1024");
+            checkNotNull(this.nettySource, "nettySource must be set");
+            checkArgument(this.numberOfThreads > 0,
                     "numberOfThreads must be set as positive");
 
             return new NettyServer(this);
@@ -157,13 +181,11 @@ public NettyServer build() {
     }
 
     /**
-     * tcp or udp network protocol
+     * Network protocol.
      */
     public enum Type {
-
         TCP,
-
-        UDP
+        UDP,
+        HTTP
     }
-
 }
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
index 81fd2037ac..42f497e631 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
@@ -18,18 +18,21 @@
  */
 package org.apache.pulsar.io.netty.server;
 
-import io.netty.channel.*;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.io.Serializable;
+import java.util.Optional;
+
 import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.Optional;
-
 /**
- * Handles a server-side channel
+ * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
 public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
new file mode 100644
index 0000000000..5a1ab94659
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.server;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
new file mode 100644
index 0000000000..eae087be49
--- /dev/null
+++ 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.http;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.pulsar.io.netty.NettySource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Channel Initializer
+ */
+public class NettyHttpChannelInitializerTest {
+
+    @Test
+    public void testChannelInitializer() throws Exception {
+        NioSocketChannel channel = new NioSocketChannel();
+
+        NettyHttpChannelInitializer nettyChannelInitializer = new 
NettyHttpChannelInitializer(
+                new NettyHttpServerHandler(new NettySource()), null);
+        nettyChannelInitializer.initChannel(channel);
+
+        assertNotNull(channel.pipeline().toMap());
+        assertEquals(2, channel.pipeline().toMap().size());
+    }
+
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 05a43dc40b..c81ee984f6 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,4 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
-- [Netty Tcp or Udp Source Connector](io-netty.md#source)
+- [Netty Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-netty.md b/site2/docs/io-netty.md
index 479752f9ad..4b3d16bb0c 100644
--- a/site2/docs/io-netty.md
+++ b/site2/docs/io-netty.md
@@ -6,7 +6,7 @@ sidebar_label: Netty Tcp or Udp Connector
 
 ## Source
 
-The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from 
Tcp/Udp Client and write them to user-defined Pulsar topic.
+The Netty Source connector opens a port that accept incoming data via the 
configured network protocol and publish it to a user-defined Pulsar topic.
 Also, this connector is suggested to be used in a containerized (e.g. k8s) 
deployment.
 Otherwise, if the connector is running in process or thread mode, the 
instances may be conflicting on listening to ports.
 
@@ -14,7 +14,7 @@ Otherwise, if the connector is running in process or thread 
mode, the instances
 
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
-| `type` | `false` | `tcp` | The tcp or udp network protocol required by 
netty. |
+| `type` | `false` | `tcp` | The network protocol over which data is 
trasmitted to netty. Valid values include HTTP, TCP, and UDP |
 | `host` | `false` | `127.0.0.1` | The host name or address that the source 
instance to listen on. |
 | `port` | `false` | `10999` | The port that the source instance to listen on. 
|
 | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp 
Server to accept incoming connections and handle the traffic of the accepted 
connections. |


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to