Hi: I have a Linux RedHat-6 application using *Camel 2.15.2* and *netty3* and I need to reply to various UDP clients that send UDP requests into my Camel route. However, I can't do a blind-ACK. That is, I have to first process each received UDP request, synchronously produce it to a remote Kafka cluster, and only when the Exchange is successfully produced to Kafka, then formulate the associated UDP reply and send it back to the associated UDP client.
To do this my input route consumes via *netty:udp* on a well-known listen port, and I then place all received exchanges on a *SEDA_KAFKA_QUEUE* that consumes them and produces to Kafka. I'm using the Camel *onCompletion.onCompleteOnly()* mechanism in this Kafka route to trigger sending the successfully processed Exchanges to a *SEDA_UDP_ACK_QUEUE* that will use a processor to formulate the ACK body and then send it as the UDP reply via the Camel netty component. I can only get this to work and actually send the ACK if I use a hard-coded port number in the netty URI. But in production, I'll be receiving from many different UDP clients and I need to send back to their ephemeral port number. I'm looking for a way to parameterize the sending of the UDP replies back using the *host:port* combo that is specified in the *CamelNettyRemoteAddress* header of the Exchange. Any thoughts greatly appreciated. Below are my routes. Thanks, SteveR from("netty:udp://dloco.m.mission.net:11111?serverPipelineFactory=#MY_SERVER_PIPELINE_FACTORY&keepAlive=true&sync=true&orderedThreadPoolExecutor=false&receiveBufferSize=26214400&sendBufferSize=26214400&allowDefaultCodec=false&disconnectOnNoReply=false&receiveBufferSizePredictor=8192") .setExchangePattern(ExchangePattern.InOnly) .routeId(sourceRouteId) .startupOrder(routeStartupOrder) .setProperty(Exchange.CHARSET_NAME, ExpressionBuilder.constantExpression(charsetName)) .threads(threadPoolSize /*poolSize*/, threadPoolSize * 2 /*maxPoolSize*/).threadName("threads_" + sourceRouteId); .to(kafkaQueueURI).id(sourceRouteId + "_TO_KAFKA_QUEUE"); // ------------------------ // SEDA_KAFKA_QUEUE // ------------------------ from(kafkaQueueURI) .errorHandler(deadLetterChannelBuilder) // Add route-scoped DeadLetterChannel error handler .onCompletion() .onCompleteOnly() // Synchronize only after Exchange completes successfully with no errors .parallelProcessing() // Tells Camel to use a thread pool for onCompletion route .to(ackQueueURI).id(sourceRouteId + "_ON_COMPLETION_ONLY_TO_ACK_QUEUE") .end() // Must use end() to denote the end of the onCompletion route .setExchangePattern(ExchangePattern.InOnly) .routeId(kafkaQueueRouteId) .startupOrder(kafkaQueueRouteStartupOrder) .setProperty(Exchange.CHARSET_NAME, ExpressionBuilder.constantExpression(charsetName)) .threads(threadPoolSize /*poolSize*/, threadPoolSize * 2 /*maxPoolSize*/).threadName("threads_" + kafkaQueueRouteId) // ------------------------------------------------ // This processor handles Kafka related processing. // For example, determining the Kafka partitioning. // ------------------------------------------------ .process(kafkaProcessor) .id(kafkaProcessorId) // --------------------------------------------------- // Here we route to the final destination (e.g. Kafka) // --------------------------------------------------- .to(kafkaToURI); // ---------------------------- // SEDA_UDP_ACK_QUEUE // ---------------------------- from(ackQueueURI) .setExchangePattern(ExchangePattern.InOut) .routeId(ackQueueRouteId) .startupOrder(ackQueueRouteStartupOrder) .setProperty(Exchange.CHARSET_NAME, ExpressionBuilder.constantExpression(charsetName)) .process(ackBackProcessor) .id(ackBackProcessorId) .to("netty:udp://dloco.m.mission.net:9998?clientPipelineFactory=#MY_CLIENT_PIPELINE_FACTORY&sendBufferSize=26214400&allowDefaultCodec=false"); -- View this message in context: http://camel.465427.n5.nabble.com/Camel-netty-How-to-send-UDP-reply-based-on-NETTY-REMOTE-ADDRESS-tp5773044.html Sent from the Camel - Users mailing list archive at Nabble.com.