Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2704#discussion_r193436151 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java --- @@ -12,28 +12,40 @@ package org.apache.storm.messaging.netty; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import java.util.Map; import org.apache.storm.Config; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; - -class StormServerPipelineFactory implements ChannelPipelineFactory { - private Server server; - - StormServerPipelineFactory(Server server) { +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; + +class StormServerPipelineFactory extends ChannelInitializer<Channel> { + private final KryoValuesSerializer ser; + private final KryoValuesDeserializer deser; + private final Map<String, Object> topoConf; + private final Server server; + + StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser, + Map<String, Object> topoConf, Server server) { + this.ser = ser; + this.deser = deser; + this.topoConf = topoConf; this.server = server; } - public ChannelPipeline getPipeline() throws Exception { + @Override + protected void initChannel(Channel ch) throws Exception { // Create a default pipeline implementation. - ChannelPipeline pipeline = Channels.pipeline(); - + ChannelPipeline pipeline = ch.pipeline(); + // Decoder --- End diff -- nit: whitespace
---