I have a linux Java7 stand-alone application using Camel 2.14 and the
camel-netty component. I have a route that receives via netty:udp and then
attempts to mirror the received packets also via netty:udp. I'm also using
the *LengthFieldBasedFrameDecoder *to decode the UDP packets into message
frames based on a 2-byte length field within each UDP message.

The UDP messages I'm receiving contain certain characters that don't decode
in UTF-8 (e.g. 0xa1 0xb2, 0xc3 ), so I've been trying to use the
*iso-8859-1* charset. I'm thinking that what I want in my
ServerPipelineFactory subclass is the
*io.netty.handler.codec.bytes.ByteArrayDecoder*, but I'm unable to get it to
compile with the *addlast()* method.

I've been using the approach outlined  here
<https://github.com/apache/camel/blob/master/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java>
 
thus far, but then I run into this compile issue with *io.netty *versus
*org.jboss.netty*.

Which leads me to believe that I'm confused wrt imports for *io.netty*
versus *org.jboss.netty*?  Below is my extended *ServerPipelineFactory
*class which I'd like to switch the StringDecoder/StringEncoder for
ByteArrayDecoder/ByteArrayEncoder.

Any help is greatly appreciated!

  Thanks, SteveR


package multiprotocollistenerrouter;

//import io.netty.handler.codec.bytes.ByteArrayDecoder;
//import io.netty.handler.codec.bytes.ByteArrayEncoder;
//import io.netty.channel.ChannelPipeline;
//import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.util.CharsetUtil;
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.ServerPipelineFactory;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

//import org.jboss.netty.handler.logging.LoggingHandler;
//import org.jboss.netty.logging.InternalLogLevel;

import org.slf4j.Logger;        // The org.slf4j.Logger interface is the
main user entry point of SLF4J API.
import org.slf4j.LoggerFactory; // Utility class producing Loggers for
various logging APIs, most notably for log4j.

/**
 * Consumer linked channel pipeline factory for the MCQ source provider.
 * Provides custom support for reception/parsing/processing of MCQ.
 *
 * @author steve
 *
 * @see http://camel.apache.org/netty.html
 * @see
http://opensourceknowledge.blogspot.com/2010/08/customizing-netty-endpoints-using.html#
 * @see
http://seeallhearall.blogspot.com/2012/06/netty-tutorial-part-15-on-channel.html
 * @see
https://github.com/apache/camel/blob/master/components/camel-netty/src/test/
 *     
java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
 */
public class McqServerPipelineFactory extends ServerPipelineFactory  {
    private final static Logger logger   =
LoggerFactory.getLogger(McqServerPipelineFactory.class);
    private static final String NEW_LINE =
System.getProperty("line.separator");

    // -------------------------------------------------------------------
    // Stateless, singleton handler instances...re-used across connections
    // -------------------------------------------------------------------
    private static final ChannelHandler STR_ENCODER = new
StringEncoder(CharsetUtil.ISO_8859_1);
    private static final ChannelHandler STR_DECODER = new
StringDecoder(CharsetUtil.ISO_8859_1);

//    private static final ChannelHandler LOG_HANDLER = new
LoggingHandler(InternalLogLevel.INFO);

//    private static final ByteArrayDecoder BYTES_DECODER = new
ByteArrayDecoder();
//    private static final ByteArrayEncoder BYTES_ENCODER = new
ByteArrayEncoder();

    private final NettyConsumer consumer;

    private final int maxFrameLength;
    private final int lengthFieldOffset;
    private final int lengthFieldLength;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;
    private boolean   invoked;
    private String    routeId;

    @Override
    public ChannelPipeline getPipeline() throws Exception {  
        logger.trace("getPipeline(): ENTER");

        invoked = true;  
        ChannelPipeline channelPipeline = Channels.pipeline();  

        String theRouteId = consumer.getRoute().getId();
        logger.info("getPipeline(): {}, routeId = {}", consumer.toString(),
theRouteId);

        // -----------------------------------------------
        // Add logger to print incoming and outgoing data.
        // This is both an upstream/downstream handler.
        // -----------------------------------------------
//        String loggerName = "MCQ_LOGGING_HANDLER_" + theRouteId;
//        channelPipeline.addLast(loggerName, LOG_HANDLER);

        //
-----------------------------------------------------------------------------------
        // Here we add the custom message decoder. Frame decoders are
typically
        // stateful, thus we create a new instance with every pipeline.
        // See
https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffer.html
        // See
https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffers.html
        //
-----------------------------------------------------------------------------------
        String lengthFieldBasedFrameDecoderName =
"MCQ_LENGTH_FIELD_FRAME_BASED_DECODER_" + theRouteId;
        channelPipeline.addLast(lengthFieldBasedFrameDecoderName,
                new LengthFieldBasedFrameDecoder(maxFrameLength,
                                                 lengthFieldOffset,
                                                 lengthFieldLength,
                                                 lengthAdjustment,
                                                 initialBytesToStrip));

        if(logger.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("getPipeline(): Added
").append(lengthFieldBasedFrameDecoderName).append(":").
               append(NEW_LINE).
               append("maxFrameLength      = ").append(maxFrameLength)  
.append(NEW_LINE).
               append("lengthFieldOffset   =
").append(lengthFieldOffset).append(NEW_LINE).
               append("lengthFieldLength   =
").append(lengthFieldLength).append(NEW_LINE).
               append("lengthAdjustment    = ").append(lengthAdjustment)
.append(NEW_LINE).
               append("initialBytesToStrip = ").append(initialBytesToStrip);
            logger.info(sb.toString());
        }

        // -----------------------------------------------------------
        // Add string encoder (downstream) / string decoder (upstream)
        // -----------------------------------------------------------
        String stringDecoderName = "MCQ_STRING_DECODER_" + theRouteId;
        channelPipeline.addLast(stringDecoderName, STR_DECODER);
        String stringEncoderName = "MCQ_STRING_ENCODER_" + theRouteId;
        channelPipeline.addLast(stringEncoderName, STR_ENCODER);

        // -----------------------------------------------------------
        // Add byte[] encoder (downstream) / byte[] decoder (upstream)
        // -----------------------------------------------------------
//        String bytesDecoderName = "MCQ_BYTES_DECODER_" + theRouteId; 
//        channelPipeline.addLast(bytesDecoderName, BYTES_DECODER);
//        String bytesEncoderName = "MCQ_BYTES_ENCODER_" + theRouteId; 
//        channelPipeline.addLast(bytesEncoderName, BYTES_ENCODER);

        // ------------------------------------------------------
        // Here we add the default Camel ServerChannelHandler for
        // the consumer, to allow Camel to route the message, etc.
        // ------------------------------------------------------
        String serverChannelHandlerName = "MCQ_SERVER_CHANNEL_HANDLER_" +
theRouteId;
        channelPipeline.addLast(serverChannelHandlerName, new
ServerChannelHandler(consumer));
        logger.info("getPipeline(): Added default Camel {}",
serverChannelHandlerName);

        logger.trace("getPipeline(): EXIT");
        return channelPipeline;  
    }

    /**
     * Create the server pipeline factory.
     *
     * @param consumer
     * @return ServerPipelineFactory
     */
    @Override
    public ServerPipelineFactory createPipelineFactory(NettyConsumer
consumer) {
        logger.trace("createPipelineFactory(NettyConsumer): ENTER with {}",
                    (consumer == null) ? "null" : consumer);

        if(consumer != null) {
            // -------------------------------------------------------------
            // - Get routeId from NettyConsumer input.
            // - Get serverPipelineFactoryName from routeId.
            // - Get LengthFieldBasedFrameDecoderBean from RouteManager.
            // - Construct McqServerPipelineFactory from
            //   NettyConsumer and LengthFieldBasedFrameDecoderBean members.
            // -------------------------------------------------------------
            String theRouteId = consumer.getRoute().getId();

            String serverPipelineFactoryName =
RouteManager.getServerPipelineFactoryName(theRouteId);

            LengthFieldBasedFrameDecoderBean
lengthFieldBasedFrameDecoderBean =
               
RouteManager.getLengthFieldBasedFrameDecoderInfo(serverPipelineFactoryName);
            
            Integer theMaxFrameLength      =
lengthFieldBasedFrameDecoderBean.getMaxFrameLengthInBytes();
            Integer theLengthFieldOffset   =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldOffset();
            Integer theLengthFieldLength   =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldLength();
            Integer theLengthAdjustment    =
lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthAdjustment();
            Integer theInitialBytesToStrip =
lengthFieldBasedFrameDecoderBean.getFrameDecoderInitialBytesToStrip();

            if(logger.isInfoEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Constructing McqServerPipelineFactory:")             
 
.append(NEW_LINE).
                   append("consumer            = ").append(consumer)            
 
.append(NEW_LINE).
                   append("maxFrameLength      =
").append(theMaxFrameLength)     .append(NEW_LINE).
                   append("lengthFieldOffset   =
").append(theLengthFieldOffset)  .append(NEW_LINE).
                   append("lengthFieldLength   =
").append(theLengthFieldLength)  .append(NEW_LINE).
                   append("lengthAdjustment    =
").append(theLengthAdjustment)   .append(NEW_LINE).
                   append("initialBytesToStrip =
").append(theInitialBytesToStrip).append(NEW_LINE).
                   append("routeId             = ").append(theRouteId);
                logger.info(sb.toString());
            }

            logger.trace("createPipelineFactory(): EXIT");
            return new McqServerPipelineFactory(consumer,
                                                theMaxFrameLength,
                                                theLengthFieldOffset,
                                                theLengthFieldLength,
                                                theLengthAdjustment,
                                                theInitialBytesToStrip,
                                                theRouteId);
        }

        logger.trace("createPipelineFactory(): EXIT"); 
        return null;
    }

    public boolean isfactoryInvoked() {  
        return invoked;  
    }

    public McqServerPipelineFactory(NettyConsumer consumer) {
        logger.trace("McqServerPipelineFactory(NettyConsumer): ENTER with
{}", (consumer == null) ? "null" : consumer);

        this.maxFrameLength      = 0;
        this.lengthFieldOffset   = 0;
        this.lengthFieldLength   = 0;
        this.lengthAdjustment    = 0;
        this.initialBytesToStrip = 0;
        this.consumer            = consumer;

        logger.trace("McqServerPipelineFactory(NettyConsumer): EXIT");
    }

    public McqServerPipelineFactory(
            NettyConsumer consumer,
            Integer maxFrameLength,
            Integer lengthFieldOffset,
            Integer lengthFieldLength,
            Integer lengthAdjustment,
            Integer initialBytesToStrip,
            String  routeId) {
        if(logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("McqServerPipelineFactory(): ENTER with:").
               append(NEW_LINE).
               append("consumer            = ").append(consumer)          
.append(NEW_LINE).
               append("maxFrameLength      = ").append(maxFrameLength)    
.append(NEW_LINE).
               append("lengthFieldOffset   = ").append(lengthFieldOffset) 
.append(NEW_LINE).
               append("lengthFieldLength   = ").append(lengthFieldLength) 
.append(NEW_LINE).
               append("lengthAdjustment    = ").append(lengthAdjustment)  
.append(NEW_LINE).
               append("initialBytesToStrip =
").append(initialBytesToStrip).append(NEW_LINE).
               append("routeId             = ").append(routeId);
            logger.trace(sb.toString());
        }

        this.consumer            = consumer;
        this.maxFrameLength      = maxFrameLength;
        this.lengthFieldOffset   = lengthFieldOffset;
        this.lengthFieldLength   = lengthFieldLength;
        this.lengthAdjustment    = lengthAdjustment;
        this.initialBytesToStrip = initialBytesToStrip;
        this.routeId             = routeId;

        logger.trace("McqServerPipelineFactory(): EXIT");
    }
}
// ------------- end --------------





--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-2-14-Netty-How-to-add-ByteArrayDecoder-to-ServerChannelPipeline-tp5768638.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to