I have a linux Java7 stand-alone application using Camel 2.14 and the camel-netty component. I have a route that receives via netty:udp and then attempts to mirror the received packets also via netty:udp. I'm also using the *LengthFieldBasedFrameDecoder *to decode the UDP packets into message frames based on a 2-byte length field within each UDP message.
The UDP messages I'm receiving contain certain characters that don't decode in UTF-8 (e.g. 0xa1 0xb2, 0xc3 ), so I've been trying to use the *iso-8859-1* charset. I'm thinking that what I want in my ServerPipelineFactory subclass is the *io.netty.handler.codec.bytes.ByteArrayDecoder*, but I'm unable to get it to compile with the *addlast()* method. I've been using the approach outlined here <https://github.com/apache/camel/blob/master/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java> thus far, but then I run into this compile issue with *io.netty *versus *org.jboss.netty*. Which leads me to believe that I'm confused wrt imports for *io.netty* versus *org.jboss.netty*? Below is my extended *ServerPipelineFactory *class which I'd like to switch the StringDecoder/StringEncoder for ByteArrayDecoder/ByteArrayEncoder. Any help is greatly appreciated! Thanks, SteveR package multiprotocollistenerrouter; //import io.netty.handler.codec.bytes.ByteArrayDecoder; //import io.netty.handler.codec.bytes.ByteArrayEncoder; //import io.netty.channel.ChannelPipeline; //import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.CharsetUtil; import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.ServerPipelineFactory; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; //import org.jboss.netty.handler.logging.LoggingHandler; //import org.jboss.netty.logging.InternalLogLevel; import org.slf4j.Logger; // The org.slf4j.Logger interface is the main user entry point of SLF4J API. import org.slf4j.LoggerFactory; // Utility class producing Loggers for various logging APIs, most notably for log4j. /** * Consumer linked channel pipeline factory for the MCQ source provider. * Provides custom support for reception/parsing/processing of MCQ. * * @author steve * * @see http://camel.apache.org/netty.html * @see http://opensourceknowledge.blogspot.com/2010/08/customizing-netty-endpoints-using.html# * @see http://seeallhearall.blogspot.com/2012/06/netty-tutorial-part-15-on-channel.html * @see https://github.com/apache/camel/blob/master/components/camel-netty/src/test/ * java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java */ public class McqServerPipelineFactory extends ServerPipelineFactory { private final static Logger logger = LoggerFactory.getLogger(McqServerPipelineFactory.class); private static final String NEW_LINE = System.getProperty("line.separator"); // ------------------------------------------------------------------- // Stateless, singleton handler instances...re-used across connections // ------------------------------------------------------------------- private static final ChannelHandler STR_ENCODER = new StringEncoder(CharsetUtil.ISO_8859_1); private static final ChannelHandler STR_DECODER = new StringDecoder(CharsetUtil.ISO_8859_1); // private static final ChannelHandler LOG_HANDLER = new LoggingHandler(InternalLogLevel.INFO); // private static final ByteArrayDecoder BYTES_DECODER = new ByteArrayDecoder(); // private static final ByteArrayEncoder BYTES_ENCODER = new ByteArrayEncoder(); private final NettyConsumer consumer; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthAdjustment; private final int initialBytesToStrip; private boolean invoked; private String routeId; @Override public ChannelPipeline getPipeline() throws Exception { logger.trace("getPipeline(): ENTER"); invoked = true; ChannelPipeline channelPipeline = Channels.pipeline(); String theRouteId = consumer.getRoute().getId(); logger.info("getPipeline(): {}, routeId = {}", consumer.toString(), theRouteId); // ----------------------------------------------- // Add logger to print incoming and outgoing data. // This is both an upstream/downstream handler. // ----------------------------------------------- // String loggerName = "MCQ_LOGGING_HANDLER_" + theRouteId; // channelPipeline.addLast(loggerName, LOG_HANDLER); // ----------------------------------------------------------------------------------- // Here we add the custom message decoder. Frame decoders are typically // stateful, thus we create a new instance with every pipeline. // See https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffer.html // See https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffers.html // ----------------------------------------------------------------------------------- String lengthFieldBasedFrameDecoderName = "MCQ_LENGTH_FIELD_FRAME_BASED_DECODER_" + theRouteId; channelPipeline.addLast(lengthFieldBasedFrameDecoderName, new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip)); if(logger.isInfoEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("getPipeline(): Added ").append(lengthFieldBasedFrameDecoderName).append(":"). append(NEW_LINE). append("maxFrameLength = ").append(maxFrameLength) .append(NEW_LINE). append("lengthFieldOffset = ").append(lengthFieldOffset).append(NEW_LINE). append("lengthFieldLength = ").append(lengthFieldLength).append(NEW_LINE). append("lengthAdjustment = ").append(lengthAdjustment) .append(NEW_LINE). append("initialBytesToStrip = ").append(initialBytesToStrip); logger.info(sb.toString()); } // ----------------------------------------------------------- // Add string encoder (downstream) / string decoder (upstream) // ----------------------------------------------------------- String stringDecoderName = "MCQ_STRING_DECODER_" + theRouteId; channelPipeline.addLast(stringDecoderName, STR_DECODER); String stringEncoderName = "MCQ_STRING_ENCODER_" + theRouteId; channelPipeline.addLast(stringEncoderName, STR_ENCODER); // ----------------------------------------------------------- // Add byte[] encoder (downstream) / byte[] decoder (upstream) // ----------------------------------------------------------- // String bytesDecoderName = "MCQ_BYTES_DECODER_" + theRouteId; // channelPipeline.addLast(bytesDecoderName, BYTES_DECODER); // String bytesEncoderName = "MCQ_BYTES_ENCODER_" + theRouteId; // channelPipeline.addLast(bytesEncoderName, BYTES_ENCODER); // ------------------------------------------------------ // Here we add the default Camel ServerChannelHandler for // the consumer, to allow Camel to route the message, etc. // ------------------------------------------------------ String serverChannelHandlerName = "MCQ_SERVER_CHANNEL_HANDLER_" + theRouteId; channelPipeline.addLast(serverChannelHandlerName, new ServerChannelHandler(consumer)); logger.info("getPipeline(): Added default Camel {}", serverChannelHandlerName); logger.trace("getPipeline(): EXIT"); return channelPipeline; } /** * Create the server pipeline factory. * * @param consumer * @return ServerPipelineFactory */ @Override public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) { logger.trace("createPipelineFactory(NettyConsumer): ENTER with {}", (consumer == null) ? "null" : consumer); if(consumer != null) { // ------------------------------------------------------------- // - Get routeId from NettyConsumer input. // - Get serverPipelineFactoryName from routeId. // - Get LengthFieldBasedFrameDecoderBean from RouteManager. // - Construct McqServerPipelineFactory from // NettyConsumer and LengthFieldBasedFrameDecoderBean members. // ------------------------------------------------------------- String theRouteId = consumer.getRoute().getId(); String serverPipelineFactoryName = RouteManager.getServerPipelineFactoryName(theRouteId); LengthFieldBasedFrameDecoderBean lengthFieldBasedFrameDecoderBean = RouteManager.getLengthFieldBasedFrameDecoderInfo(serverPipelineFactoryName); Integer theMaxFrameLength = lengthFieldBasedFrameDecoderBean.getMaxFrameLengthInBytes(); Integer theLengthFieldOffset = lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldOffset(); Integer theLengthFieldLength = lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldLength(); Integer theLengthAdjustment = lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthAdjustment(); Integer theInitialBytesToStrip = lengthFieldBasedFrameDecoderBean.getFrameDecoderInitialBytesToStrip(); if(logger.isInfoEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Constructing McqServerPipelineFactory:") .append(NEW_LINE). append("consumer = ").append(consumer) .append(NEW_LINE). append("maxFrameLength = ").append(theMaxFrameLength) .append(NEW_LINE). append("lengthFieldOffset = ").append(theLengthFieldOffset) .append(NEW_LINE). append("lengthFieldLength = ").append(theLengthFieldLength) .append(NEW_LINE). append("lengthAdjustment = ").append(theLengthAdjustment) .append(NEW_LINE). append("initialBytesToStrip = ").append(theInitialBytesToStrip).append(NEW_LINE). append("routeId = ").append(theRouteId); logger.info(sb.toString()); } logger.trace("createPipelineFactory(): EXIT"); return new McqServerPipelineFactory(consumer, theMaxFrameLength, theLengthFieldOffset, theLengthFieldLength, theLengthAdjustment, theInitialBytesToStrip, theRouteId); } logger.trace("createPipelineFactory(): EXIT"); return null; } public boolean isfactoryInvoked() { return invoked; } public McqServerPipelineFactory(NettyConsumer consumer) { logger.trace("McqServerPipelineFactory(NettyConsumer): ENTER with {}", (consumer == null) ? "null" : consumer); this.maxFrameLength = 0; this.lengthFieldOffset = 0; this.lengthFieldLength = 0; this.lengthAdjustment = 0; this.initialBytesToStrip = 0; this.consumer = consumer; logger.trace("McqServerPipelineFactory(NettyConsumer): EXIT"); } public McqServerPipelineFactory( NettyConsumer consumer, Integer maxFrameLength, Integer lengthFieldOffset, Integer lengthFieldLength, Integer lengthAdjustment, Integer initialBytesToStrip, String routeId) { if(logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("McqServerPipelineFactory(): ENTER with:"). append(NEW_LINE). append("consumer = ").append(consumer) .append(NEW_LINE). append("maxFrameLength = ").append(maxFrameLength) .append(NEW_LINE). append("lengthFieldOffset = ").append(lengthFieldOffset) .append(NEW_LINE). append("lengthFieldLength = ").append(lengthFieldLength) .append(NEW_LINE). append("lengthAdjustment = ").append(lengthAdjustment) .append(NEW_LINE). append("initialBytesToStrip = ").append(initialBytesToStrip).append(NEW_LINE). append("routeId = ").append(routeId); logger.trace(sb.toString()); } this.consumer = consumer; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; this.initialBytesToStrip = initialBytesToStrip; this.routeId = routeId; logger.trace("McqServerPipelineFactory(): EXIT"); } } // ------------- end -------------- -- View this message in context: http://camel.465427.n5.nabble.com/Camel-2-14-Netty-How-to-add-ByteArrayDecoder-to-ServerChannelPipeline-tp5768638.html Sent from the Camel - Users mailing list archive at Nabble.com.