There is a disconnect option you can try
http://camel.apache.org/mina

On Mon, Dec 16, 2013 at 11:46 AM, Shing Hing Man <mat...@yahoo.com> wrote:
> Hi,
>
>  I am using Camel 2.11.1 Mina, to send tcp messages to a remote load 
> balancer. The load balancer  forwards the tcp messages to three workers in a  
> round robin fashion.  By default (please correct me if I am wrong), a tcp 
> connection created by Mina will be kept alive until timeout. Consequently, 
> the tcp messages from  Camel Mina  will always  be forwarded to the same 
> worker by the load balancer.
>
> Is it possible to configure Camel Mina to send a message in a new  connection 
> ? If the above is possible, the load balancer will distribute the messages to 
> the three workers in a round robin fashion.
>
> I have tried setting "disconnect=true" in the producer endpoint.  (Please see 
> TcpEchoServer.java and   TCPProducerAsyn.java below.)  But I get the 
> following error when more than 3 messages are sent.
>
>
> [                          main] DefaultErrorHandler            ERROR Failed 
> delivery for (MessageId: ID-gauss-site-36756-1387189162872-0-5 on ExchangeId: 
> ID-gauss-site-36756-1387189162872-0-6). Exhausted after delivery attempt: 1 
> caught: org.apache.camel.CamelExchangeException: Cannot write body. 
> Exchange[Message: Hello3]
> org.apache.camel.CamelExchangeException: Cannot write body. Exchange[Message: 
> Hello3]
>     at 
> org.apache.camel.component.mina.MinaHelper.writeBody(MinaHelper.java:55)[camel-mina-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.component.mina.MinaProducer.doProcess(MinaProducer.java:127)[camel-mina-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:77)[camel-mina-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:63)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:366)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:337)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:233)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:337)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.ProducerCache.send(ProducerCache.java:175)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:111)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:124)[camel-core-2.11.1.jar:2.11.1]
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:131)[camel-core-2.11.1.jar:2.11.1]
>     at 
> net.sf.camel.tcp.TCPProducerAsyn.main(TCPProducerAsyn.java:40)[file:/home/shing/work/workspaces/wsa/testcamel/target/classes/:]
> Exception in thread "main" org.apache.camel.CamelExecutionException: 
> Exception occurred during execution on the exchange: Exchange[Message: Hello3]
>     at 
> org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1358)
>     at 
> org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:619)
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:454)
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:450)
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:126)
>     at 
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:131)
>     at net.sf.camel.tcp.TCPProducerAsyn.main(TCPProducerAsyn.java:40)
> Caused by: org.apache.camel.CamelExchangeException: Cannot write body. 
> Exchange[Message: Hello3]
>     at 
> org.apache.camel.component.mina.MinaHelper.writeBody(MinaHelper.java:55)
>     at 
> org.apache.camel.component.mina.MinaProducer.doProcess(MinaProducer.java:127)
>     at 
> org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:77)
>     at 
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>     at 
> org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
>     at 
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
>     at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>     at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>     at 
> org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>     at 
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
>     at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>     at 
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
>     at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
>     at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>     at 
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
>     at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>     at 
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
>     at 
> org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
>     at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:
>
> ----------------------
>
> public class TCPEchoServer {
>     public static void main(String[] args) throws Exception {
>         // create CamelContext
>         CamelContext context = new DefaultCamelContext();
>
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() {
>                 from(
>                         
> "mina://tcp://localhost:6200?textline=true&sync=true&decoderMaxLineLength=10000")
>                         .to("stream:out");
>
>             }
>
>         });
>         // start the route and let it do its work
>         context.start();
>
>         Thread.sleep(600000);
>
>         // stop the CamelContext
>         context.stop();
>
>     }
> }
> -----------------
>
> public class TCPProducerAsyn {
>
>     public static void main(String[] args) throws Exception {
>         // create CamelContext
>         CamelContext context = new DefaultCamelContext();
>
>         Component direct = new DirectComponent();
>
>         context.addComponent("start", direct);
>
>         final String  directEndPoint =  "direct:start";
>
>         // add our route to the CamelContext
>         context.addRoutes(new RouteBuilder() {
>             @Override
>             public void configure() {
>                 // Input from console. Replace "," by ";"
>                 from(directEndPoint)
>                         
> .to("mina://tcp://localhost:6200?textline=true&sync=false&disconnect=true");
>
>             }
>
>         });
>         // start the route and let it do its work
>         context.start();
>         ProducerTemplate template  =context.createProducerTemplate();
>
>         // Send message asynchronously.
>
>         for (int i=1; i< 10; ++i){
>           template.sendBody(directEndPoint, "Hello" + i);
>         }
>
>         Thread.sleep(60000);
>
>         // stop the CamelContext
>         context.stop();
>
>     }
>
> }
> -------------------------
>
> Thanks in advance for your assistance !
>
> Shing



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io

Reply via email to