Repository: camel Updated Branches: refs/heads/master 5485d200f -> 2144590dd
CAMEL-8507 Added Support for the mandatory and immediate flags Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2144590d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2144590d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2144590d Branch: refs/heads/master Commit: 2144590dd83d4eea142faa2a47b5b34b0130a7d3 Parents: 5485d20 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Mar 19 20:30:59 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Mar 19 20:33:04 2015 +0800 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConstants.java | 4 +++- .../component/rabbitmq/RabbitMQEndpoint.java | 20 ++++++++++++++++++++ .../component/rabbitmq/RabbitMQProducer.java | 13 +++++++++---- 3 files changed, 32 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index b1f4a0c..f2e5568 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -17,7 +17,7 @@ package org.apache.camel.component.rabbitmq; public final class RabbitMQConstants { - + // TODO need to change the constant which is start with camel public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY"; public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME"; public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE"; @@ -35,6 +35,8 @@ public final class RabbitMQConstants { public static final String TIMESTAMP = "rabbitmq.TIMESTAMP"; public static final String APP_ID = "rabbitmq.APP_ID"; public static final String REQUEUE = "rabbitmq.REQUEUE"; + public static final String MANDATORY = "rabbitmq.MANDATORY"; + public static final String IMMEDIATE = "rabbitmq.IMMEDIATE"; public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange"; public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index ea311d7..26ad0b5 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -139,6 +139,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint { //Maximum time (in milliseconds) waiting for channel @UriParam(defaultValue = "1000") private long channelPoolMaxWait = 1000; + @UriParam(defaultValue = "false") + private boolean mandatory; + @UriParam(defaultValue = "false") + private boolean immediate; @UriParam private ArgsConfigurer queueArgsConfigurer; @UriParam @@ -633,6 +637,22 @@ public class RabbitMQEndpoint extends DefaultEndpoint { this.channelPoolMaxWait = channelPoolMaxWait; } + public boolean isMandatory() { + return mandatory; + } + + public void setMandatory(boolean mandatory) { + this.mandatory = mandatory; + } + + public boolean isImmediate() { + return immediate; + } + + public void setImmediate(boolean immediate) { + this.immediate = immediate; + } + /** * Get the configurer for setting the queue args in Channel.queueDeclare * @return http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 6652544..28858a6 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -139,8 +139,10 @@ public class RabbitMQProducer extends DefaultProducer { } byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); AMQP.BasicProperties properties = buildProperties(exchange).build(); - - basicPublish(exchangeName, key, properties, messageBodyBytes); + Boolean mandatory = exchange.getIn().getHeader(RabbitMQConstants.MANDATORY, getEndpoint().isMandatory(), Boolean.class); + Boolean immediate = exchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, getEndpoint().isImmediate(), Boolean.class); + + basicPublish(exchangeName, key, mandatory, immediate, properties, messageBodyBytes); } /** @@ -148,10 +150,13 @@ public class RabbitMQProducer extends DefaultProducer { * * @param exchange Target exchange * @param routingKey Routing key + * @param mandatory This flag tells the server how to react if the message cannot be routed to a queue. + * @param immediate This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. * @param properties Header properties * @param body Body content */ - private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception { + private void basicPublish(final String exchange, final String routingKey, final boolean mandatory, final boolean immediate, + final AMQP.BasicProperties properties, final byte[] body) throws Exception { if (channelPool == null) { // Open connection and channel lazily openConnectionAndChannelPool(); @@ -159,7 +164,7 @@ public class RabbitMQProducer extends DefaultProducer { execute(new ChannelCallback<Void>() { @Override public Void doWithChannel(Channel channel) throws Exception { - channel.basicPublish(exchange, routingKey, properties, body); + channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body); return null; } });