This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch rabbitmq in repository https://gitbox.apache.org/repos/asf/camel.git
commit ef22b9eb1a56fd551ab40642dcb1a7f44ab11e1a Author: Claus Ibsen <[email protected]> AuthorDate: Mon Apr 12 09:15:51 2021 +0200 CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling --- .../springrabbit/EndpointMessageListener.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java index e43b783..84e5d59 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.springrabbit; +import java.util.Map; + import com.rabbitmq.client.Channel; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -139,6 +141,8 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); + // release back when synchronous mode + consumer.releaseExchange(exchange, false); } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -155,9 +159,18 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } protected Exchange createExchange(Message message, Channel channel, Object replyDestination) { - Exchange exchange = endpoint.createExchange(message); + Exchange exchange = consumer.createExchange(false); exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel); + Object body = endpoint.getMessageConverter().fromMessage(message); + exchange.getMessage().setBody(body); + + Map<String, Object> headers + = endpoint.getMessagePropertiesConverter().fromMessageProperties(message.getMessageProperties(), exchange); + if (!headers.isEmpty()) { + exchange.getMessage().setHeaders(headers); + } + // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { // only change pattern if not already out capable @@ -245,6 +258,11 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } } } + + if (!doneSync) { + // release back when in asynchronous mode + consumer.releaseExchange(exchange, false); + } } private void sendReply(Address replyDestination, Message message, Exchange exchange, org.apache.camel.Message out) {
