This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec205a  Added HTTP Support (#3336)
7ec205a is described below

commit 7ec205ad5733bbbc9d99da16ed4721221bb5b315
Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com>
AuthorDate: Fri Jan 11 13:27:36 2019 -0800

    Added HTTP Support (#3336)
    
    * Added HTTP Support
    
    * Updated documentation and removed duplicate code
    
    * Added unit test for NettyHttpChannelInitializer
---
 .../org/apache/pulsar/io/netty/NettySource.java    |   7 +-
 .../apache/pulsar/io/netty/NettySourceConfig.java  |  19 ++-
 .../NettyHttpChannelInitializer.java}              |  19 ++-
 .../io/netty/http/NettyHttpServerHandler.java      | 144 +++++++++++++++++++++
 .../package-info.java}                             |  26 +---
 ...tyChannelInitializer.java => package-info.java} |  26 +---
 .../io/netty/server/NettyChannelInitializer.java   |   2 +-
 .../apache/pulsar/io/netty/server/NettyServer.java |  70 ++++++----
 .../pulsar/io/netty/server/NettyServerHandler.java |  13 +-
 ...tyChannelInitializer.java => package-info.java} |  26 +---
 .../http/NettyHttpChannelInitializerTest.java}     |  32 ++---
 site2/docs/io-connectors.md                        |   2 +-
 site2/docs/io-netty.md                             |   4 +-
 13 files changed, 250 insertions(+), 140 deletions(-)

diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
index 215bd34..1e799d8 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
@@ -18,20 +18,21 @@
  */
 package org.apache.pulsar.io.netty;
 
+import java.util.Map;
+
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 import org.apache.pulsar.io.netty.server.NettyServer;
-import java.util.Map;
 
 /**
- * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and 
write to user-defined Pulsar topic
+ * A simple Netty Source connector to listen for incoming messages and write 
to user-defined Pulsar topic.
  */
 @Connector(
     name = "netty",
     type = IOType.SOURCE,
-    help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp 
messages and write to user-defined Pulsar topic",
+    help = "A simple Netty Source connector to listen for incoming messages 
and write to user-defined Pulsar topic",
     configClass = NettySourceConfig.class)
 public class NettySource extends PushSource<byte[]> {
 
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
index f5d40e9..1ef4c35 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
@@ -20,17 +20,22 @@ package org.apache.pulsar.io.netty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import lombok.*;
-import lombok.experimental.Accessors;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
 /**
- * Netty Tcp or Udp Source Connector Config.
+ * Netty Source Connector Config.
  */
 @Data
 @Setter
@@ -45,7 +50,7 @@ public class NettySourceConfig implements Serializable {
     @FieldDoc(
             required = true,
             defaultValue = "tcp",
-            help = "The tcp or udp network protocols")
+            help = "The network protocol to use, supported values are 'tcp', 
'udp', and 'http'")
     private String type = "tcp";
 
     @FieldDoc(
@@ -63,8 +68,8 @@ public class NettySourceConfig implements Serializable {
     @FieldDoc(
             required = true,
             defaultValue = "1",
-            help = "The number of threads of Netty Tcp Server to accept 
incoming connections and " +
-                    "handle the traffic of the accepted connections")
+            help = "The number of threads of Netty Tcp Server to accept 
incoming connections and "
+                    + "handle the traffic of the accepted connections")
     private int numberOfThreads = 1;
 
     public static NettySourceConfig load(Map<String, Object> map) throws 
IOException {
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
similarity index 65%
copy from 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
index b5fa820..dce26fb 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializer.java
@@ -16,28 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.http;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
 
 /**
- * Netty Channel Initializer to register decoder and handler
+ * Netty Channel Initializer to register HTTP decoder and handler.
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
+public class NettyHttpChannelInitializer extends 
ChannelInitializer<SocketChannel> {
 
+    private final SslContext sslCtx;
     private ChannelInboundHandlerAdapter handler;
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+    public NettyHttpChannelInitializer(ChannelInboundHandlerAdapter handler, 
SslContext sslCtx) {
         this.handler = handler;
+        this.sslCtx = sslCtx;
     }
 
     @Override
     protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
+        if (sslCtx != null) {
+            
socketChannel.pipeline().addLast(sslCtx.newHandler(socketChannel.alloc()));
+        }
+        socketChannel.pipeline().addLast(new HttpServerCodec());
         socketChannel.pipeline().addLast(this.handler);
     }
-
 }
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
new file mode 100644
index 0000000..ef4cf8c
--- /dev/null
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.http;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.CharsetUtil;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import lombok.Data;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.netty.NettySource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles a server-side HTTP channel.
+ */
+@ChannelHandler.Sharable
+public class NettyHttpServerHandler extends 
SimpleChannelInboundHandler<Object> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyHttpServerHandler.class);
+
+    private NettySource nettySource;
+
+    public NettyHttpServerHandler(NettySource nettySource) {
+        this.nettySource = nettySource;
+    }
+
+    private HttpRequest request;
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+
+        if (msg instanceof HttpRequest) {
+            HttpRequest request = this.request = (HttpRequest) msg;
+
+            if (HttpUtil.is100ContinueExpected(request)) {
+                send100Continue(ctx);
+            }
+        }
+
+        if (msg instanceof HttpContent) {
+            HttpContent httpContent = (HttpContent) msg;
+
+            ByteBuf content = httpContent.content();
+            if (content.isReadable()) {
+                nettySource.consume(new 
NettyHttpRecord(Optional.ofNullable(""),
+                        content.toString(CharsetUtil.UTF_8).getBytes()));
+            }
+
+            if (msg instanceof LastHttpContent) {
+                LastHttpContent trailer = (LastHttpContent) msg;
+
+                if (!writeResponse(trailer, ctx)) {
+                    // If keep-alive is off, close the connection once the 
content is fully written.
+                    
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+                }
+            }
+        }
+    }
+
+    private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext 
ctx) {
+        // Decide whether to close the connection or not.
+        boolean keepAlive = HttpUtil.isKeepAlive(request);
+        // Build the response object.
+        FullHttpResponse response = new DefaultFullHttpResponse(
+                HttpVersion.HTTP_1_1,
+                currentObj.decoderResult().isSuccess() ? HttpResponseStatus.OK 
: HttpResponseStatus.BAD_REQUEST,
+                Unpooled.EMPTY_BUFFER);
+
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
+
+        if (keepAlive) {
+            // Add 'Content-Length' header only for a keep-alive connection.
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 
response.content().readableBytes());
+            // Add keep alive header as per:
+            // - 
http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
+            response.headers().set(HttpHeaderNames.CONNECTION, 
HttpHeaderValues.KEEP_ALIVE);
+        }
+
+        // Write the response.
+        ctx.write(response);
+
+        return keepAlive;
+    }
+
+    private static void send100Continue(ChannelHandlerContext ctx) {
+        FullHttpResponse response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
+        ctx.write(response);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        logger.error("Error when processing incoming data", cause);
+        ctx.close();
+    }
+
+    @Data
+    static private class NettyHttpRecord implements Record<byte[]>, 
Serializable {
+        private final Optional<String> key;
+        private final byte[] value;
+    }
+}
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
similarity index 50%
copy from 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
index b5fa820..2e71305 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/package-info.java
@@ -16,28 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
+package org.apache.pulsar.io.netty.http;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java
similarity index 50%
copy from 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java
index b5fa820..482e05a 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/package-info.java
@@ -16,28 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
+package org.apache.pulsar.io.netty;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
index b5fa820..b9a7b4c 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
@@ -24,7 +24,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
- * Netty Channel Initializer to register decoder and handler
+ * Netty Channel Initializer to register decoder and handler.
  */
 public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
 
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 2cb8031..775b6f4 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.io.netty.server;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -29,11 +32,13 @@ import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Netty Tcp or Udp Server to accept any incoming data through Tcp.
+ * Netty Server to accept incoming data via the configured type.
  */
 public class NettyServer {
 
@@ -58,28 +63,32 @@ public class NettyServer {
     public void run() {
         try {
             switch (type) {
-                case TCP:
-                    runTcp();
-                    break;
                 case UDP:
                     runUdp();
                     break;
+                case HTTP:
+                    runHttp();
+                    break;
+                case TCP:
                 default:
                     runTcp();
                     break;
             }
-        } catch(Exception ex) {
-            logger.error("Error occurred when Netty Tcp or Udp Server is 
running", ex);
+        } catch (Exception ex) {
+            logger.error("Error occurred when Netty Server is running", ex);
         } finally {
             shutdownGracefully();
         }
     }
 
     public void shutdownGracefully() {
-        if (workerGroup != null)
+        if (workerGroup != null) {
             workerGroup.shutdownGracefully();
-        if (bossGroup != null)
+        }
+
+        if (bossGroup != null) {
             bossGroup.shutdownGracefully();
+        }
     }
 
     private void runUdp() throws InterruptedException {
@@ -95,17 +104,32 @@ public class NettyServer {
     }
 
     private void runTcp() throws InterruptedException {
+        ServerBootstrap serverBootstrap = getServerBootstrap(
+                new NettyChannelInitializer(new 
NettyServerHandler(this.nettySource)));
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private void runHttp() throws InterruptedException {
+        ServerBootstrap serverBootstrap = getServerBootstrap(
+                new NettyHttpChannelInitializer(new 
NettyHttpServerHandler(this.nettySource), null));
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private ServerBootstrap getServerBootstrap(ChannelHandler childHandler) {
         bossGroup = new NioEventLoopGroup(this.numberOfThreads);
         workerGroup = new NioEventLoopGroup(this.numberOfThreads);
         ServerBootstrap serverBootstrap = new ServerBootstrap();
         serverBootstrap.group(bossGroup, workerGroup);
         serverBootstrap.channel(NioServerSocketChannel.class);
-        serverBootstrap.childHandler(new NettyChannelInitializer(new 
NettyServerHandler(this.nettySource)))
-                .option(ChannelOption.SO_BACKLOG, 1024)
-                .childOption(ChannelOption.SO_KEEPALIVE, true);
+        serverBootstrap.childHandler(childHandler)
+        .option(ChannelOption.SO_BACKLOG, 1024)
+        .childOption(ChannelOption.SO_KEEPALIVE, true);
 
-        ChannelFuture channelFuture = serverBootstrap.bind(this.host, 
this.port).sync();
-        channelFuture.channel().closeFuture().sync();
+        return serverBootstrap;
     }
 
     /**
@@ -145,11 +169,11 @@ public class NettyServer {
         }
 
         public NettyServer build() {
-            Preconditions.checkNotNull(this.type, "type cannot be blank/null");
-            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host 
cannot be blank/null");
-            Preconditions.checkArgument(this.port >= 1024, "port must be set 
equal or bigger than 1024");
-            Preconditions.checkNotNull(this.nettySource, "nettySource must be 
set");
-            Preconditions.checkArgument(this.numberOfThreads > 0,
+            checkNotNull(this.type, "type cannot be blank/null");
+            checkArgument(StringUtils.isNotBlank(host), "host cannot be 
blank/null");
+            checkArgument(this.port >= 1024, "port must be set equal or bigger 
than 1024");
+            checkNotNull(this.nettySource, "nettySource must be set");
+            checkArgument(this.numberOfThreads > 0,
                     "numberOfThreads must be set as positive");
 
             return new NettyServer(this);
@@ -157,13 +181,11 @@ public class NettyServer {
     }
 
     /**
-     * tcp or udp network protocol
+     * Network protocol.
      */
     public enum Type {
-
         TCP,
-
-        UDP
+        UDP,
+        HTTP
     }
-
 }
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
index 81fd203..42f497e 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
@@ -18,18 +18,21 @@
  */
 package org.apache.pulsar.io.netty.server;
 
-import io.netty.channel.*;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.io.Serializable;
+import java.util.Optional;
+
 import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.Optional;
-
 /**
- * Handles a server-side channel
+ * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
 public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
similarity index 50%
copy from 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
index b5fa820..5a1ab94 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/package-info.java
@@ -16,28 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
+package org.apache.pulsar.io.netty.server;
\ No newline at end of file
diff --git 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
similarity index 50%
copy from 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to 
pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
index b5fa820..eae087b 100644
--- 
a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ 
b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/http/NettyHttpChannelInitializerTest.java
@@ -16,28 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.http;
 
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.pulsar.io.netty.NettySource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
- * Netty Channel Initializer to register decoder and handler
+ * Tests for Netty Channel Initializer
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> 
{
+public class NettyHttpChannelInitializerTest {
 
-    private ChannelInboundHandlerAdapter handler;
+    @Test
+    public void testChannelInitializer() throws Exception {
+        NioSocketChannel channel = new NioSocketChannel();
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
+        NettyHttpChannelInitializer nettyChannelInitializer = new 
NettyHttpChannelInitializer(
+                new NettyHttpServerHandler(new NettySource()), null);
+        nettyChannelInitializer.initChannel(channel);
 
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
+        assertNotNull(channel.pipeline().toMap());
+        assertEquals(2, channel.pipeline().toMap().size());
     }
 
 }
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 05a43dc..c81ee98 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,4 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
-- [Netty Tcp or Udp Source Connector](io-netty.md#source)
+- [Netty Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-netty.md b/site2/docs/io-netty.md
index 479752f..4b3d16b 100644
--- a/site2/docs/io-netty.md
+++ b/site2/docs/io-netty.md
@@ -6,7 +6,7 @@ sidebar_label: Netty Tcp or Udp Connector
 
 ## Source
 
-The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from 
Tcp/Udp Client and write them to user-defined Pulsar topic.
+The Netty Source connector opens a port that accept incoming data via the 
configured network protocol and publish it to a user-defined Pulsar topic.
 Also, this connector is suggested to be used in a containerized (e.g. k8s) 
deployment.
 Otherwise, if the connector is running in process or thread mode, the 
instances may be conflicting on listening to ports.
 
@@ -14,7 +14,7 @@ Otherwise, if the connector is running in process or thread 
mode, the instances
 
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
-| `type` | `false` | `tcp` | The tcp or udp network protocol required by 
netty. |
+| `type` | `false` | `tcp` | The network protocol over which data is 
trasmitted to netty. Valid values include HTTP, TCP, and UDP |
 | `host` | `false` | `127.0.0.1` | The host name or address that the source 
instance to listen on. |
 | `port` | `false` | `10999` | The port that the source instance to listen on. 
|
 | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp 
Server to accept incoming connections and handle the traffic of the accepted 
connections. |

Reply via email to