Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2704#discussion_r193436151
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
 ---
    @@ -12,28 +12,40 @@
     
     package org.apache.storm.messaging.netty;
     
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelInitializer;
    +import io.netty.channel.ChannelPipeline;
    +import java.util.Map;
     import org.apache.storm.Config;
    -import org.jboss.netty.channel.ChannelPipeline;
    -import org.jboss.netty.channel.ChannelPipelineFactory;
    -import org.jboss.netty.channel.Channels;
    -
    -class StormServerPipelineFactory implements ChannelPipelineFactory {
    -    private Server server;
    -
    -    StormServerPipelineFactory(Server server) {
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +
    +class StormServerPipelineFactory extends ChannelInitializer<Channel> {
    +    private final KryoValuesSerializer ser;
    +    private final KryoValuesDeserializer deser;
    +    private final Map<String, Object> topoConf;
    +    private final Server server;
    +
    +    StormServerPipelineFactory(KryoValuesSerializer ser, 
KryoValuesDeserializer deser,
    +        Map<String, Object> topoConf, Server server) {
    +        this.ser = ser;
    +        this.deser = deser;
    +        this.topoConf = topoConf;
             this.server = server;
         }
     
    -    public ChannelPipeline getPipeline() throws Exception {
    +    @Override
    +    protected void initChannel(Channel ch) throws Exception {
             // Create a default pipeline implementation.
    -        ChannelPipeline pipeline = Channels.pipeline();
    -
    +        ChannelPipeline pipeline = ch.pipeline();
    +        
             // Decoder
    --- End diff --
    
    nit: whitespace


---

Reply via email to