Author: ningjiang
Date: Sun Aug 24 20:03:03 2008
New Revision: 688606
URL: http://svn.apache.org/viewvc?rev=688606&view=rev
Log:
CAMEL-846 support to send the response message according to the message's
header return address in CamelTargetAdapter
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java?rev=688606&r1=688605&r2=688606&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
(original)
+++
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
Sun Aug 24 20:03:03 2008
@@ -82,23 +82,9 @@
}
@Override
- public void setHeader(String name, Object value) {
- if (siMessage != null) {
- siMessage.getHeaders().put(name, value);
- } else {
- super.setHeader(name, value);
- }
- }
-
- @Override
public Map<String, Object> getHeaders() {
if (siMessage != null) {
- Map<String, Object> answer = new HashMap<String, Object>();
- MessageHeaders header = siMessage.getHeaders();
- for (String name : header.keySet()) {
- answer.put(name, header.get(name));
- }
- return answer;
+ return siMessage.getHeaders();
} else {
return super.getHeaders();
}
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java?rev=688606&r1=688605&r2=688606&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
(original)
+++
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
Sun Aug 24 20:03:03 2008
@@ -90,15 +90,12 @@
headers.put(MessageHeaders.RETURN_ADDRESS , inputChannel);
}
org.springframework.integration.message.Message siOutmessage =
SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
- if (exchange.getPattern().isInCapable()) {
- //Set the return channel address
- outputChannel.send(siOutmessage);
+ outputChannel.send(siOutmessage);
+ if (exchange.getPattern().isInCapable()) {
org.springframework.integration.message.Message siInMessage =
- inputChannel.receive();
+ inputChannel.receive();
SpringIntegrationBinding.storeToCamelMessage(siInMessage,
exchange.getOut());
- } else {
- outputChannel.send(siOutmessage);
}
}
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java?rev=688606&r1=688605&r2=688606&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
(original)
+++
activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
Sun Aug 24 20:03:03 2008
@@ -26,9 +26,12 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.integration.bus.MessageBus;
+import org.springframework.integration.bus.MessageBusAware;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
+import org.springframework.integration.message.MessageHeaders;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
@@ -40,16 +43,17 @@
*
* @version $Revision$
*/
-public class CamelTargetAdapter extends AbstractCamelAdapter implements
MessageTarget {
+public class CamelTargetAdapter extends AbstractCamelAdapter implements
MessageTarget, MessageBusAware {
private final Log logger = LogFactory.getLog(this.getClass());
private ProducerTemplate<Exchange> camelTemplate;
private MessageChannel replyChannel;
+ private MessageBus messageBus;
public void setReplyChannel(MessageChannel channel) {
replyChannel = channel;
}
-
+
public MessageChannel getReplyChannel() {
return replyChannel;
}
@@ -81,12 +85,22 @@
}
Message response = null;
if (isExpectReply()) {
- // TODO need to check the message header
+ //Check the message header for the return address
response =
SpringIntegrationBinding.storeToSpringIntegrationMessage(outExchange.getOut());
- result = replyChannel.send(response);
+ MessageChannel messageReplyChannel = (MessageChannel)
message.getHeaders().get(MessageHeaders.RETURN_ADDRESS);
+ if (messageReplyChannel != null) {
+ result = messageReplyChannel.send(response);
+ } else {
+ result = replyChannel.send(response);
+ }
}
return result;
}
+ public void setMessageBus(MessageBus mBus) {
+ messageBus = mBus;
+ }
+
+
}
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java?rev=688606&r1=688605&r2=688606&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
(original)
+++
activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
Sun Aug 24 20:03:03 2008
@@ -16,12 +16,17 @@
*/
package org.apache.camel.component.spring.integration.adapter;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
+import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageHeaders;
import org.springframework.integration.message.StringMessage;
public class CamelTargetAdapterTest extends SpringTestSupport {
@@ -47,6 +52,22 @@
assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",
result);
}
+
+ public void testSendingTwoWayMessageWithMessageAddress() throws Exception {
+
+ MessageChannel requestChannel = (MessageChannel)
applicationContext.getBean("channelB");
+ PollableChannel responseChannel = (PollableChannel)
applicationContext.getBean("channelD");
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(MessageHeaders.RETURN_ADDRESS, responseChannel);
+ GenericMessage<String> message = new
GenericMessage<String>(MESSAGE_BODY, headers);
+ requestChannel.send(message);
+
+
+ Message responseMessage = responseChannel.receive();
+ String result = (String) responseMessage.getPayload();
+
+ assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",
result);
+ }
@Override
protected ClassPathXmlApplicationContext createApplicationContext() {
Modified:
activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml?rev=688606&r1=688605&r2=688606&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
(original)
+++
activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
Sun Aug 24 20:03:03 2008
@@ -36,6 +36,7 @@
<channel-adapter id="channelA" target="camelTargetA"/>
<channel-adapter id="channelB" target="camelTargetB"/>
<channel id="channelC"/>
+ <channel id="channelD"/>
<!-- START SNIPPET: example -->
<!-- Create the camel context here -->
<camelContext id="camelTargetContext"
xmlns="http://activemq.apache.org/camel/schema/spring">