Ok, I just tried upgrading to *Camel 2.16.0* in the hope that the new 
Dynamic To <http://camel.apache.org/message-endpoint.html>   would solve
this problem, but I still get the same result, that is all exchanges that
are sent to the route stay marked as *ExchangesInflight*.  This time I even
had a UDP listener running using the Linux  socat
<http://www.dest-unreach.org/socat/doc/socat.html>  command and it receives
the correct number of UDP acknowledgements.  So the route is sending all
exchanges that it received on its SEDA queue, but they never transition to
the completed state.

Below is my updated route that uses the new *toD()* method.  Below that is
the route which sends to this ack-back route.

I'm really puzzled by this behavior, but I need to figure it out soon
because my management is getting cranky ;-(

   Thanks for any help,  SteveR


        from(uri="seda:SEDA_ACK_QUEUE?size=200000&concurrentConsumers=10")
                .setExchangePattern(ExchangePattern.InOnly)
                .routeId(ackQueueRouteId)
                .startupOrder(ackQueueRouteStartupOrder)
                .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
                .process(cqmsAckBackProcessor).id(cqmsAckBackProcessorId)
*               .toD(
                        "netty:udp://${header.REMOTE_HOST_IP}:" +
                        "${header.REMOTE_PORT_NUMBER}?" +
                        "clientPipelineFactory=#MY_CLIENT_PIPELINE_FACTORY&" +
                        "sendBufferSize=26214400&allowDefaultCodec=false"
                )*
                .stop().end();


        from(kafkaQueueURI)
                .errorHandler(deadLetterChannelBuilder) // Add route-scoped
DeadLetterChannel error handler
*               .onCompletion()
                        .parallelProcessing() // Tells Camel to use a thread 
pool for
onCompletion route
                        .onCompleteOnly()     // Synchronize only after 
Exchange completes
successfully with no errors
                        .to(ackQueueURI).id(sourceRouteId + 
"_ON_COMPLETION_ONLY_TO_ACK_QUEUE")
                .end() // Must use end() to denote the end of the onCompletion 
route*
                .setExchangePattern(ExchangePattern.InOnly)
                .routeId(kafkaQueueRouteId)
                .startupOrder(kafkaQueueRouteStartupOrder)
                .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName))
                .threads(threadPoolSize /*poolSize*/, threadPoolSize * 2 
/*maxPoolSize*/)
                        .threadName("threads_" + kafkaQueueRouteId)
                .callerRunsWhenRejected(true) // Hard-coded since we always 
want this
behavior!
                //
----------------------------------------------------------------------------------------
                // This processor handles Kafka related processing, e.g. 
determining the
Kafka partitioning
                //
----------------------------------------------------------------------------------------
                .process(kafkaProcessor).id(kafkaProcessorId)
                // ---------------------------------------------------
                // Here we route to the final destination (e.g. Kafka)
                // ---------------------------------------------------
                .to(kafkaToURI)
                .setProperty(Exchange.CHARSET_NAME,
ExpressionBuilder.constantExpression(charsetName));


<http://camel.465427.n5.nabble.com/file/n5773080/IN_FLIGHT_1.png> 



--
View this message in context: 
http://camel.465427.n5.nabble.com/NEED-HELP-Exchanges-remain-inflight-on-route-with-recipientList-to-netty-udp-tp5773079p5773080.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to