I am implementing the quorum queue feature of rabbitmq. Following is the route configuration for the implementation:-
context.addRoutes(new RouteBuilder() { public void configure() throws Exception { from("timer:hello?period=1000") .transform(simple("Random number ${random(0,100)}")) .end() .to("rabbitmq:test?addresses=172.17.0.2:5672, 172.17.0.3:5672, 172.17.0.4:5672, 172.17.0.5:5672, 172.17.0.6:5672 &connectionFactory=factory&exchangeType=direct&arg.queue.x-queue-type=quorum&autoDelete=false&queue=hello"); } }); In the above case I have setup a rabbitmq cluster using docker consisting of 5 nodes. Addresses option contain the list of the rabbitmq nodes ip and port. As in the case of quorum queue leader and follower are formed but when I am stopping the leader it is unable to bind to the other node in the address pool. According to doc of camel rabbitmq component it should bind to other address in the pool but its not happening in the above scanerio. Below is the attached error log file. But the above setup is working perfect in the case of HA mirroring using classic queue type and it is able to switch to other node in case of failure. Please provide an insight for the above issue. Thank You
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 3.0.1 (CamelContext: camel-1) is starting [main] INFO org.apache.camel.impl.engine.DefaultManagementStrategy - JMX is disabled [main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html [main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: timer://hello?period=1000 [main] INFO org.apache.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 are started [main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 3.0.1 (CamelContext: camel-1) started in 4.427 seconds [RabbitMQ Error On Write Thread] WARN com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured (Exception message: Connection reset) [AMQP Connection 172.17.0.2:5672] WARN com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured (Exception message: Connection reset) [Camel (camel-1) thread #1 - timer://hello] WARN org.apache.camel.component.rabbitmq.RabbitMQProducer - Got a closed channel from the pool. Invalidating and borrowing a new one from the pool. [Camel (camel-1) thread #1 - timer://hello] INFO org.apache.camel.component.rabbitmq.RabbitMQProducer - Reconnecting to RabbitMQ [Camel (camel-1) thread #1 - timer://hello] ERROR org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery for (MessageId: ID-renish-1583472848118-0-50 on ExchangeId: ID-renish-1583472848118-0-49). Exhausted after delivery attempt: 1 caught: java.io.IOException Message History --------------------------------------------------------------------------------------------------------------------------------------- RouteId ProcessorId Processor Elapsed (ms) [route1 ] [route1 ] [from[timer://hello?period=1000] ] [ 14] [route1 ] [transform1 ] [transform[Simple: Random number ${random(0,100)}] ] [ 0] [route1 ] [to1 ] [rabbitmq:test?addresses=172.17.0.2:5672, 172.17.0.3:5672, 172.17.0.4:5672, 172] [ 0] Stacktrace --------------------------------------------------------------------------------------------------------------------------------------- java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindQueue(RabbitMQDeclareSupport.java:132) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangeWithQueue(RabbitMQDeclareSupport.java:55) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangesAndQueuesUsing(RabbitMQDeclareSupport.java:36) at org.apache.camel.component.rabbitmq.RabbitMQEndpoint.declareExchangeAndQueue(RabbitMQEndpoint.java:231) at org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:116) at org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:113) at org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:90) at org.apache.camel.component.rabbitmq.RabbitMQProducer.attemptDeclaration(RabbitMQProducer.java:113) at org.apache.camel.component.rabbitmq.RabbitMQProducer.openConnectionAndChannelPool(RabbitMQProducer.java:108) at org.apache.camel.component.rabbitmq.RabbitMQProducer.checkConnectionAndChannelPool(RabbitMQProducer.java:135) at org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:85) at org.apache.camel.component.rabbitmq.RabbitMQProducer.basicPublish(RabbitMQProducer.java:285) at org.apache.camel.component.rabbitmq.RabbitMQProducer.processInOnly(RabbitMQProducer.java:272) at org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:197) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) at org.apache.camel.processor.Pipeline.process(Pipeline.java:87) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228) at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:193) at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:75) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ... 26 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) at java.lang.Thread.run(Thread.java:748) [Camel (camel-1) thread #1 - timer://hello] WARN org.apache.camel.component.timer.TimerConsumer - Error processing exchange. Exchange[ID-renish-1583472848118-0-49]. Caused by: [java.io.IOException - null] java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindQueue(RabbitMQDeclareSupport.java:132) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangeWithQueue(RabbitMQDeclareSupport.java:55) at org.apache.camel.component.rabbitmq.RabbitMQDeclareSupport.declareAndBindExchangesAndQueuesUsing(RabbitMQDeclareSupport.java:36) at org.apache.camel.component.rabbitmq.RabbitMQEndpoint.declareExchangeAndQueue(RabbitMQEndpoint.java:231) at org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:116) at org.apache.camel.component.rabbitmq.RabbitMQProducer$1.doWithChannel(RabbitMQProducer.java:113) at org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:90) at org.apache.camel.component.rabbitmq.RabbitMQProducer.attemptDeclaration(RabbitMQProducer.java:113) at org.apache.camel.component.rabbitmq.RabbitMQProducer.openConnectionAndChannelPool(RabbitMQProducer.java:108) at org.apache.camel.component.rabbitmq.RabbitMQProducer.checkConnectionAndChannelPool(RabbitMQProducer.java:135) at org.apache.camel.component.rabbitmq.RabbitMQProducer.execute(RabbitMQProducer.java:85) at org.apache.camel.component.rabbitmq.RabbitMQProducer.basicPublish(RabbitMQProducer.java:285) at org.apache.camel.component.rabbitmq.RabbitMQProducer.processInOnly(RabbitMQProducer.java:272) at org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:197) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:134) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:476) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:185) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) at org.apache.camel.processor.Pipeline.process(Pipeline.java:87) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:228) at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:193) at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:75) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ... 26 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'volante' in vhost '/': received none but current is the value 'quorum' of type 'longstr', class-id=50, method-id=10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) at java.lang.Thread.run(Thread.java:748) [Camel (camel-1) thread #1 - timer://hello] WARN org.apache.camel.component.rabbitmq.RabbitMQProducer - Got a closed channel from the pool. Invalidating and borrowing a new one from the pool. [Camel (camel-1) thread #1 - timer://hello] ERROR org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery for (MessageId: ID-renish-1583472848118-0-52 on ExchangeId: ID-renish-1583472848118-0-51). Exhausted after delivery attempt: 1 caught: java.io.IOException