Hi:

I have a Linux RedHat-6 application using *Camel 2.15.2* and *netty3* and I
need to reply to various UDP clients that send UDP requests into my Camel
route.  However, I can't do a blind-ACK. That is, I have to first process
each received UDP request, synchronously produce it to a remote Kafka
cluster, and only when the Exchange is successfully produced to Kafka, then
formulate the associated UDP reply and send it back to the associated UDP
client.

To do this my input route consumes via *netty:udp* on a well-known listen
port, and I then place all received exchanges on a *SEDA_KAFKA_QUEUE* that
consumes them and produces to Kafka.  I'm using the Camel
*onCompletion.onCompleteOnly()* mechanism in this Kafka route to trigger
sending the successfully processed Exchanges to a *SEDA_UDP_ACK_QUEUE* that
will use a processor to formulate the ACK body and then send it as the UDP
reply via the Camel netty component.

I can only get this to work and actually send the ACK if I use a hard-coded
port number in the netty URI. But in production, I'll be receiving from many
different UDP clients and I need to send back to their ephemeral port
number.  I'm looking for a way to parameterize the sending of the UDP
replies back using the *host:port* combo that is specified in the
*CamelNettyRemoteAddress* header of the Exchange.

Any thoughts greatly appreciated.  Below are my routes.

  Thanks, SteveR

         
from("netty:udp://dloco.m.mission.net:11111?serverPipelineFactory=#MY_SERVER_PIPELINE_FACTORY&keepAlive=true&sync=true&orderedThreadPoolExecutor=false&receiveBufferSize=26214400&sendBufferSize=26214400&allowDefaultCodec=false&disconnectOnNoReply=false&receiveBufferSizePredictor=8192")
                        .setExchangePattern(ExchangePattern.InOnly)
                        .routeId(sourceRouteId)
                        .startupOrder(routeStartupOrder)
                        .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
                        .threads(threadPoolSize /*poolSize*/, threadPoolSize * 2
/*maxPoolSize*/).threadName("threads_" + sourceRouteId);
                        .to(kafkaQueueURI).id(sourceRouteId + 
"_TO_KAFKA_QUEUE");

                // ------------------------
                // SEDA_KAFKA_QUEUE
                // ------------------------
                from(kafkaQueueURI)
                        .errorHandler(deadLetterChannelBuilder) // Add 
route-scoped
DeadLetterChannel error handler
                        .onCompletion()
                                .onCompleteOnly()     // Synchronize only after 
Exchange completes
successfully with no errors
                                .parallelProcessing() // Tells Camel to use a 
thread pool for
onCompletion route
                                .to(ackQueueURI).id(sourceRouteId + 
"_ON_COMPLETION_ONLY_TO_ACK_QUEUE")
                        .end() // Must use end() to denote the end of the 
onCompletion route
                        .setExchangePattern(ExchangePattern.InOnly)
                        .routeId(kafkaQueueRouteId)
                        .startupOrder(kafkaQueueRouteStartupOrder)
                        .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
                        .threads(threadPoolSize /*poolSize*/, threadPoolSize * 2
/*maxPoolSize*/).threadName("threads_" + kafkaQueueRouteId)
                        // ------------------------------------------------
                        // This processor handles Kafka related processing.
                        // For example, determining the Kafka partitioning.
                        // ------------------------------------------------
                        .process(kafkaProcessor)
                                .id(kafkaProcessorId)
                        // ---------------------------------------------------
                        // Here we route to the final destination (e.g. Kafka)
                        // ---------------------------------------------------
                        .to(kafkaToURI);

                // ----------------------------
                // SEDA_UDP_ACK_QUEUE
                // ----------------------------
                from(ackQueueURI)
                        .setExchangePattern(ExchangePattern.InOut)
                        .routeId(ackQueueRouteId)
                        .startupOrder(ackQueueRouteStartupOrder)
                        .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
                        .process(ackBackProcessor)
                                .id(ackBackProcessorId)
                
.to("netty:udp://dloco.m.mission.net:9998?clientPipelineFactory=#MY_CLIENT_PIPELINE_FACTORY&sendBufferSize=26214400&allowDefaultCodec=false");




--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-netty-How-to-send-UDP-reply-based-on-NETTY-REMOTE-ADDRESS-tp5773044.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to